Omaha #4128: Refactor ISC and active table sharing code so that their beans perform cluster failover together.

Change-Id: Ib97d20bdd26b413648727101da26fee8449cb7ba

Former-commit-id: d242c9cc22d53dca873e62787f6ab3234dd2b7df
This commit is contained in:
David Gillingham 2015-03-11 17:47:38 -05:00
parent 40fdfa4495
commit e713928f7c
10 changed files with 705 additions and 609 deletions

View file

@ -5,7 +5,7 @@
<bean id="gfeSiteActivation" class="com.raytheon.edex.plugin.gfe.config.GFESiteActivation" <bean id="gfeSiteActivation" class="com.raytheon.edex.plugin.gfe.config.GFESiteActivation"
depends-on="commonTimeRegistered, gfeDbRegistered, levelFactoryInitialized"> depends-on="commonTimeRegistered, gfeDbRegistered, levelFactoryInitialized">
<constructor-arg ref="fetchATSrv" /> <constructor-arg ref="iscProvider" />
</bean> </bean>
<bean id="gfeSitesActiveRequest" factory-bean="siteAwareRegistry" factory-method="register"> <bean id="gfeSitesActiveRequest" factory-bean="siteAwareRegistry" factory-method="register">
@ -588,22 +588,32 @@
<constructor-arg ref="clusteredGfeIscRoutes"/> <constructor-arg ref="clusteredGfeIscRoutes"/>
</bean> </bean>
<!-- Active Table Sharing Definitions --> <!-- ISC Services Beans -->
<bean id="fetchATSrv" class="com.raytheon.edex.plugin.gfe.isc.FetchActiveTableSrv" /> <bean id="iscProvider" class="com.raytheon.edex.plugin.gfe.isc.IscServiceProvider" />
<bean factory-bean="contextManager" factory-method="registerContextStateProcessor"> <bean id="fetchATSrv" class="com.raytheon.edex.plugin.gfe.isc.FetchActiveTableSrv" />
<constructor-arg ref="activeTableSharingRoutes"/> <bean factory-bean="iscProvider" factory-method="addISCService">
<constructor-arg ref="fetchATSrv"/> <constructor-arg ref="fetchATSrv"/>
</bean> </bean>
<camelContext id="activeTableSharingRoutes" xmlns="http://camel.apache.org/schema/spring" <bean id="requestTCVSrv" class="com.raytheon.edex.plugin.gfe.isc.RequestTCVSrv" />
<bean factory-bean="iscProvider" factory-method="addISCService">
<constructor-arg ref="requestTCVSrv"/>
</bean>
<bean factory-bean="contextManager" factory-method="registerContextStateProcessor">
<constructor-arg ref="clusteredIscBeans" />
<constructor-arg ref="iscProvider" />
</bean>
<camelContext id="clusteredIscBeans" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler"> errorHandlerRef="errorHandler">
<route id="activateFetchATSrv"> <route id="activateISC">
<from uri="timer://activateActiveTableSharing?repeatCount=1"/> <from uri="timer://activateISCServices?repeatCount=1"/>
<bean ref="fetchATSrv" method="activateService"/> <bean ref="iscProvider" method="activateInstance"/>
</route> </route>
</camelContext> </camelContext>
<bean factory-bean="contextManager" factory-method="registerClusteredContext"> <bean factory-bean="contextManager" factory-method="registerClusteredContext">
<constructor-arg ref="activeTableSharingRoutes"/> <constructor-arg ref="clusteredIscBeans"/>
</bean> </bean>
</beans> </beans>

View file

@ -21,41 +21,22 @@ package com.raytheon.edex.plugin.gfe.config;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import jep.JepException;
import com.google.common.util.concurrent.MoreExecutors;
import com.raytheon.edex.plugin.gfe.exception.GfeConfigurationException; import com.raytheon.edex.plugin.gfe.exception.GfeConfigurationException;
import com.raytheon.edex.plugin.gfe.exception.GfeMissingConfigurationException; import com.raytheon.edex.plugin.gfe.exception.GfeMissingConfigurationException;
import com.raytheon.edex.plugin.gfe.isc.FetchActiveTableSrv; import com.raytheon.edex.plugin.gfe.isc.IscServiceProvider;
import com.raytheon.edex.plugin.gfe.isc.IRTManager;
import com.raytheon.edex.plugin.gfe.server.IFPServer; import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.edex.site.SiteUtil; import com.raytheon.edex.site.SiteUtil;
import com.raytheon.uf.common.dataplugin.PluginException; import com.raytheon.uf.common.dataplugin.PluginException;
import com.raytheon.uf.common.dataplugin.gfe.exception.GfeException; import com.raytheon.uf.common.dataplugin.gfe.exception.GfeException;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.python.PyUtil;
import com.raytheon.uf.common.python.PythonScript;
import com.raytheon.uf.common.site.notify.SiteActivationNotification; import com.raytheon.uf.common.site.notify.SiteActivationNotification;
import com.raytheon.uf.common.site.notify.SiteActivationNotification.ACTIVATIONSTATUS; import com.raytheon.uf.common.site.notify.SiteActivationNotification.ACTIVATIONSTATUS;
import com.raytheon.uf.common.site.notify.SiteActivationNotification.ACTIVATIONTYPE; import com.raytheon.uf.common.site.notify.SiteActivationNotification.ACTIVATIONTYPE;
import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority; import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.activetable.ActiveTablePyIncludeUtil;
import com.raytheon.uf.edex.core.EDEXUtil; import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException; import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.database.DataAccessLayerException; import com.raytheon.uf.edex.database.DataAccessLayerException;
@ -94,6 +75,7 @@ import com.raytheon.uf.edex.site.notify.SendSiteActivationNotifications;
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up * Oct 07, 2014 #3684 randerso Restructured IFPServer start up
* Dec 10, 2014 #4953 randerso Added requestTCVFiles call at site activation * Dec 10, 2014 #4953 randerso Added requestTCVFiles call at site activation
* Feb 25, 2015 #4128 dgilling Simplify activation of active table sharing. * Feb 25, 2015 #4128 dgilling Simplify activation of active table sharing.
* Mar 11, 2015 #4128 dgilling Refactor activation and management of ISC services.
* *
* </pre> * </pre>
* *
@ -113,9 +95,7 @@ public class GFESiteActivation implements ISiteActivationListener {
private boolean intialized; private boolean intialized;
private final ExecutorService postActivationTaskExecutor; private final IscServiceProvider iscServices;
private final FetchActiveTableSrv fetchAtSrv;
/** /**
* Default constructor. Builds a GFESiteActivation instance with no * Default constructor. Builds a GFESiteActivation instance with no
@ -127,17 +107,14 @@ public class GFESiteActivation implements ISiteActivationListener {
/** /**
* Builds a GFESiteActivation instance with an associated * Builds a GFESiteActivation instance with an associated
* {@code FetchActiveTableSrv} instance. Should only be used on request JVM. * {@code IscServiceProvider} instance. Should only be used on request JVM.
* *
* @param fetchAtSrv * @param iscServices
* {@code FetchActiveTableSrv} instance * {@code IscServiceProvider} instance
*/ */
public GFESiteActivation(final FetchActiveTableSrv fetchAtSrv) { public GFESiteActivation(final IscServiceProvider iscServices) {
this.intialized = false; this.intialized = false;
this.postActivationTaskExecutor = MoreExecutors this.iscServices = iscServices;
.getExitingExecutorService((ThreadPoolExecutor) Executors
.newCachedThreadPool());
this.fetchAtSrv = fetchAtSrv;
} }
@Override @Override
@ -330,63 +307,8 @@ public class GFESiteActivation implements ISiteActivationListener {
ClusterLockUtils.unlock(ct, false); ClusterLockUtils.unlock(ct, false);
} }
// Doesn't need to be cluster locked if (iscServices != null) {
statusHandler.info("Checking ISC configuration..."); iscServices.activateSite(siteID, config);
if (config.requestISC()) {
String host = InetAddress.getLocalHost().getCanonicalHostName();
String gfeHost = config.getServerHost();
String hostNameToCompare = gfeHost;
if (gfeHost.endsWith("f")) {
hostNameToCompare = gfeHost.substring(0, gfeHost.length() - 1);
}
// TODO: specific to a host and jvm type, register it independently,
// but don't hard code request
if (host.contains(hostNameToCompare)
&& System.getProperty("edex.run.mode").equals("request")) {
statusHandler.info("Enabling ISC...");
try {
IRTManager.getInstance().enableISC(siteID, config);
final IFPServerConfig configRef = config;
if (configRef.tableFetchTime() > 0) {
Runnable activateTableSharing = new Runnable() {
@Override
public void run() {
EDEXUtil.waitForRunning();
fetchAtSrv.activateSite(siteID, configRef);
}
};
postActivationTaskExecutor.submit(activateTableSharing);
}
Runnable requestTCV = new Runnable() {
@Override
public void run() {
// wait until EDEX is up and running to request TCV
// files
EDEXUtil.waitForRunning();
requestTCVFiles(siteID, configRef);
}
};
postActivationTaskExecutor.submit(requestTCV);
} catch (Exception e) {
statusHandler
.error("Error starting GFE ISC. ISC functionality will be unavailable!!",
e);
}
} else {
statusHandler
.info("ISC Enabled but will use another EDEX instance");
}
} else {
statusHandler.info("ISC is not enabled.");
} }
statusHandler.info("Adding " + siteID + " to active sites list."); statusHandler.info("Adding " + siteID + " to active sites list.");
@ -423,6 +345,10 @@ public class GFESiteActivation implements ISiteActivationListener {
} }
if (iscServices != null) {
iscServices.deactivateSite(siteID);
}
IFPServer.deactivateServer(siteID); IFPServer.deactivateServer(siteID);
statusHandler.info(siteID + " successfully deactivated"); statusHandler.info(siteID + " successfully deactivated");
@ -488,46 +414,4 @@ public class GFESiteActivation implements ISiteActivationListener {
} }
return retVal; return retVal;
} }
private void requestTCVFiles(String siteId, IFPServerConfig config) {
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext commonBaseCx = pathMgr.getContext(
LocalizationType.COMMON_STATIC, LocalizationLevel.BASE);
String scriptPath = pathMgr.getFile(commonBaseCx,
FileUtil.join(ActiveTablePyIncludeUtil.VTEC, "requestTCV.py"))
.getPath();
String pythonIncludePath = PyUtil.buildJepIncludePath(
ActiveTablePyIncludeUtil.getCommonPythonIncludePath(),
ActiveTablePyIncludeUtil.getCommonGfeIncludePath(),
ActiveTablePyIncludeUtil.getVtecIncludePath(siteId),
ActiveTablePyIncludeUtil.getGfeConfigIncludePath(siteId),
ActiveTablePyIncludeUtil.getIscScriptsIncludePath());
PythonScript script = null;
try {
script = new PythonScript(scriptPath, pythonIncludePath, this
.getClass().getClassLoader());
try {
Map<String, Object> argMap = new HashMap<String, Object>();
argMap.put("siteID", siteId);
argMap.put("config", config);
script.execute("runFromJava", argMap);
} catch (JepException e) {
statusHandler.handle(Priority.PROBLEM,
"Error executing requestTCV.", e);
}
} catch (JepException e) {
statusHandler
.handle(Priority.PROBLEM,
"Unable to instantiate requestTCV python script object.",
e);
} finally {
if (script != null) {
script.dispose();
}
}
}
} }

View file

@ -30,12 +30,11 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig; import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority; import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.RunProcess; import com.raytheon.uf.common.util.RunProcess;
import com.raytheon.uf.edex.core.IContextStateProcessor; import com.raytheon.uf.edex.core.EDEXUtil;
/** /**
* Service that fetches neighboring sites' active table entries that are * Service that fetches neighboring sites' active table entries that are
@ -53,6 +52,8 @@ import com.raytheon.uf.edex.core.IContextStateProcessor;
* fetching when site is deactivated. * fetching when site is deactivated.
* Feb 26, 2015 #4128 dgilling Moved to edex.gfe plugin, rewritten as * Feb 26, 2015 #4128 dgilling Moved to edex.gfe plugin, rewritten as
* IContextStateProcessor. * IContextStateProcessor.
* Mar 11, 2015 #4128 dgilling Ensure this service runs on same cluster
* node as was registered with IRT.
* *
* </pre> * </pre>
* *
@ -60,7 +61,7 @@ import com.raytheon.uf.edex.core.IContextStateProcessor;
* @version 1.0 * @version 1.0
*/ */
public final class FetchActiveTableSrv implements IContextStateProcessor { public final class FetchActiveTableSrv implements IISCServiceBean {
private static final class FetchATJobConfig { private static final class FetchATJobConfig {
@ -165,31 +166,21 @@ public final class FetchActiveTableSrv implements IContextStateProcessor {
private ScheduledExecutorService jobExecutor; private ScheduledExecutorService jobExecutor;
private volatile boolean activeServiceInstance;
/** /**
* Default constructor. * Default constructor.
*/ */
public FetchActiveTableSrv() { public FetchActiveTableSrv() {
this.activeServiceInstance = false;
this.siteJobInstanceMap = new ConcurrentHashMap<String, ScheduledFuture<?>>(); this.siteJobInstanceMap = new ConcurrentHashMap<String, ScheduledFuture<?>>();
} }
/** /*
* Dummy trigger method for the timer in the camel context this bean * (non-Javadoc)
* monitors. Ensures this bean properly fails over between cluster members
* as needed.
*/
public void activateService() {
activeServiceInstance = true;
}
/**
* Removes a site's active table sharing job from the job pool.
* *
* @param siteID * @see
* Site identifier for the site's job to stop. * com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#deactivateSite(java.
* lang.String)
*/ */
@Override
public void deactivateSite(final String siteID) { public void deactivateSite(final String siteID) {
ScheduledFuture<?> siteJob = siteJobInstanceMap.remove(siteID); ScheduledFuture<?> siteJob = siteJobInstanceMap.remove(siteID);
if (siteJob != null) { if (siteJob != null) {
@ -198,17 +189,19 @@ public final class FetchActiveTableSrv implements IContextStateProcessor {
} }
} }
/** /*
* Adds a site's active table sharing job to the job pool. * (non-Javadoc)
* *
* @param siteID * @see
* Site identifier for the site's job to add to job pool. * com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#activateSite(java.lang
* @param gfeConfig * .String, com.raytheon.edex.plugin.gfe.config.IFPServerConfig)
* {@code IFPServerConfig} for the site.
*/ */
@Override
public void activateSite(final String siteID, public void activateSite(final String siteID,
final IFPServerConfig gfeConfig) { final IFPServerConfig gfeConfig) {
if (activeServiceInstance && (!siteJobInstanceMap.containsKey(siteID))) { EDEXUtil.waitForRunning();
if ((gfeConfig.tableFetchTime() > 0)
&& (!siteJobInstanceMap.containsKey(siteID))) {
FetchATJobConfig jobConfig = new FetchATJobConfig(siteID, gfeConfig); FetchATJobConfig jobConfig = new FetchATJobConfig(siteID, gfeConfig);
statusHandler.info("Activating FetchAT for " + siteID); statusHandler.info("Activating FetchAT for " + siteID);
@ -226,6 +219,10 @@ public final class FetchActiveTableSrv implements IContextStateProcessor {
"Unable to submit fetchAT job for execution:", e); "Unable to submit fetchAT job for execution:", e);
siteJobInstanceMap.remove(siteID); siteJobInstanceMap.remove(siteID);
} }
} else {
statusHandler
.info("Skipping activation of active table sharing for site "
+ siteID);
} }
} }
@ -282,43 +279,23 @@ public final class FetchActiveTableSrv implements IContextStateProcessor {
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
* @see com.raytheon.uf.edex.core.IContextStateProcessor#preStart() * @see com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#startup()
*/ */
@Override @Override
public void preStart() { public void startup() {
statusHandler.info("Initializing FetchATSrv..."); statusHandler.info("Initializing FetchATSrv...");
activeServiceInstance = true;
jobExecutor = Executors.newScheduledThreadPool(1); jobExecutor = Executors.newScheduledThreadPool(1);
for (IFPServer ifpServer : IFPServer.getActiveServers()) {
IFPServerConfig config = ifpServer.getConfig();
if ((config.requestISC()) && (config.tableFetchTime() > 0)) {
activateSite(ifpServer.getSiteId(), config);
}
}
} }
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
* @see com.raytheon.uf.edex.core.IContextStateProcessor#postStart() * @see com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#preShutdown()
*/ */
@Override @Override
public void postStart() { public void preShutdown() {
// no op
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#preStop()
*/
@Override
public void preStop() {
statusHandler.info("Shutting down FetchATSrv..."); statusHandler.info("Shutting down FetchATSrv...");
activeServiceInstance = false;
if (jobExecutor != null) { if (jobExecutor != null) {
jobExecutor.shutdown(); jobExecutor.shutdown();
} }
@ -327,10 +304,10 @@ public final class FetchActiveTableSrv implements IContextStateProcessor {
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
* @see com.raytheon.uf.edex.core.IContextStateProcessor#postStop() * @see com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#postShutdown()
*/ */
@Override @Override
public void postStop() { public void postShutdown() {
jobExecutor = null; jobExecutor = null;
siteJobInstanceMap.clear(); siteJobInstanceMap.clear();
} }

View file

@ -26,15 +26,12 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import jep.JepException; import jep.JepException;
import com.raytheon.edex.plugin.gfe.config.GridDbConfig; import com.raytheon.edex.plugin.gfe.config.GridDbConfig;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig; import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfigManager;
import com.raytheon.edex.plugin.gfe.server.IFPServer; import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.edex.plugin.gfe.exception.GfeConfigurationException;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.DatabaseID; import com.raytheon.uf.common.dataplugin.gfe.db.objects.DatabaseID;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.GridLocation; import com.raytheon.uf.common.dataplugin.gfe.db.objects.GridLocation;
import com.raytheon.uf.common.dataplugin.gfe.python.GfePyIncludeUtil; import com.raytheon.uf.common.dataplugin.gfe.python.GfePyIncludeUtil;
@ -59,21 +56,23 @@ import com.raytheon.uf.common.util.FileUtil;
* *
* Date Ticket# Engineer Description * Date Ticket# Engineer Description
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* 07/14/09 1995 bphillip Initial creation * Jul 14, 2009 1995 bphillip Initial creation
* Mar 14, 2013 1794 djohnson FileUtil.listFiles now returns List. * Mar 14, 2013 1794 djohnson FileUtil.listFiles now returns List.
* 06/13/13 2044 randerso Refactored to use IFPServer * Jun 13, 2013 2044 randerso Refactored to use IFPServer
* Sep 05, 2013 2307 dgilling Use better PythonScript constructor. * Sep 05, 2013 2307 dgilling Use better PythonScript constructor.
* Oct 16, 2013 2475 dgilling Move logic previously in IrtServer.py * Oct 16, 2013 2475 dgilling Move logic previously in IrtServer.py
* into this class to avoid Jep memory leak. * into this class to avoid Jep memory leak.
* Mar 11, 2015 4128 dgilling Refactored to match refactored IRTManager.
*
* </pre> * </pre>
* *
* @author bphillip * @author bphillip
* @version 1 * @version 1.0
*/ */
public class GfeIRT extends Thread { public final class GfeIRT implements Runnable {
/** The logger */ /** The logger */
private static final transient IUFStatusHandler statusHandler = UFStatus private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(GfeIRT.class); .getHandler(GfeIRT.class);
private static final String PYTHON_INSTANCE = "irt"; private static final String PYTHON_INSTANCE = "irt";
@ -103,30 +102,33 @@ public class GfeIRT extends Thread {
/** The Python script object */ /** The Python script object */
private PythonScript script; private PythonScript script;
/** private final IRTManager irtMgr;
* Map of threads used to unregister sites from the IRT server upon shutdown
*/ private final String ancfUrl;
private static Map<String, Thread> shutdownHooks = new ConcurrentHashMap<String, Thread>();
private final String bncfUrl;
/** /**
* Creates a new GfeIRT object for the provided site ID * Creates a new GfeIRT object for the provided site ID
* *
* @param siteID * @param siteid
* The site ID to create the GfeIRT object for * The site ID to create the GfeIRT object for
* @throws GfeConfigurationException * @param config
* If the GFE configuration for the specified site could not be * @param irtMgr
* loaded.
*/ */
public GfeIRT(String siteid, IFPServerConfig config) public GfeIRT(String siteid, IFPServerConfig config, IRTManager irtMgr) {
throws GfeConfigurationException {
this.setDaemon(true);
this.siteID = siteid; this.siteID = siteid;
this.mhsID = config.getMhsid(); this.mhsID = config.getMhsid();
this.irtMgr = irtMgr;
this.serverHost = config.getServerHost(); this.serverHost = config.getServerHost();
this.serverPort = config.getRpcPort(); this.serverPort = config.getRpcPort();
this.serverProtocol = config.getProtocolVersion(); this.serverProtocol = config.getProtocolVersion();
this.ancfUrl = config.iscRoutingTableAddress().get("ANCF");
this.bncfUrl = config.iscRoutingTableAddress().get("BNCF");
GridLocation domain = config.dbDomain(); GridLocation domain = config.dbDomain();
this.gridProj = domain.getProjection().getProjectionID().toString(); this.gridProj = domain.getProjection().getProjectionID().toString();
@ -188,23 +190,10 @@ public class GfeIRT extends Thread {
} }
config.setRequestedISCsites(this.iscWfosWanted); config.setRequestedISCsites(this.iscWfosWanted);
} }
Thread hook = new Thread() {
@Override
public void run() {
statusHandler.info("Unregistering site [" + siteID
+ "] from IRT server...");
IRTManager.getInstance().disableISC(mhsID, siteID);
statusHandler.info("Site [" + siteID + "] unregistered!");
}
};
java.lang.Runtime.getRuntime().addShutdownHook(hook);
shutdownHooks.put(mhsID + siteID, hook);
} }
@Override @Override
public void run() { public void run() {
try { try {
IPathManager pathMgr = PathManagerFactory.getPathManager(); IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext cx = pathMgr.getContext( LocalizationContext cx = pathMgr.getContext(
@ -215,30 +204,29 @@ public class GfeIRT extends Thread {
String includePath = PyUtil.buildJepIncludePath( String includePath = PyUtil.buildJepIncludePath(
GfePyIncludeUtil.getCommonPythonIncludePath(), GfePyIncludeUtil.getCommonPythonIncludePath(),
GfePyIncludeUtil.getIscScriptsIncludePath(), GfePyIncludeUtil.getIscScriptsIncludePath(),
GfePyIncludeUtil.getGfeConfigIncludePath(this.siteID)); GfePyIncludeUtil.getGfeConfigIncludePath(siteID));
this.script = new PythonScript(scriptPath, includePath, getClass() script = new PythonScript(scriptPath, includePath, getClass()
.getClassLoader()); .getClassLoader());
IFPServerConfig config = IFPServerConfigManager
.getServerConfig(siteID);
Map<String, Object> initArgs = new HashMap<String, Object>(2, 1f); Map<String, Object> initArgs = new HashMap<String, Object>(2, 1f);
initArgs.put("ancfURL", config.iscRoutingTableAddress().get("ANCF")); initArgs.put("ancfURL", ancfUrl);
initArgs.put("bncfURL", config.iscRoutingTableAddress().get("BNCF")); initArgs.put("bncfURL", bncfUrl);
this.script.instantiatePythonClass(PYTHON_INSTANCE, "IrtAccess", script.instantiatePythonClass(PYTHON_INSTANCE, "IrtAccess",
initArgs); initArgs);
} catch (GfeConfigurationException e) {
throw new RuntimeException("Could not load GFE configuration", e);
} catch (JepException e) { } catch (JepException e) {
throw new RuntimeException( statusHandler.error(
"Could not instantiate IRT python script instance", e); "Could not instantiate IRT python script instance for site "
+ siteID, e);
statusHandler.error("ISC is disabled for site " + siteID);
return;
} }
try { try {
// upon any overall failure, start thread over // upon any overall failure, start thread over
while (IRTManager.getInstance().isRegistered(mhsID, siteID)) { while (irtMgr.shouldRegister(siteID)) {
try { try {
// do initial registration, keep trying until successful // do initial registration, keep trying until successful
while (IRTManager.getInstance().isRegistered(mhsID, siteID)) { while (irtMgr.shouldRegister(siteID)) {
statusHandler statusHandler
.info("performing initial IRT registration."); .info("performing initial IRT registration.");
@ -259,11 +247,10 @@ public class GfeIRT extends Thread {
if (okay) { if (okay) {
break; break;
} else if (!IRTManager.getInstance().isRegistered( } else if (!irtMgr.shouldRegister(siteID)) {
mhsID, siteID)) {
break; // exit processing loop break; // exit processing loop
} else { } else {
sleep(3 * TimeUtil.MILLIS_PER_SECOND); Thread.sleep(3 * TimeUtil.MILLIS_PER_SECOND);
} }
} }
@ -271,8 +258,9 @@ public class GfeIRT extends Thread {
// for re-register every few seconds, check the StopIRT flag // for re-register every few seconds, check the StopIRT flag
// every few seconds // every few seconds
statusHandler.info("initial IRT registration complete."); statusHandler.info("initial IRT registration complete.");
while (IRTManager.getInstance().isRegistered(mhsID, siteID)) { while (irtMgr.shouldRegister(siteID)) {
sleep(3 * TimeUtil.MILLIS_PER_SECOND); // wait 3 seconds Thread.sleep(3 * TimeUtil.MILLIS_PER_SECOND); // wait 3
// seconds
Boolean status1 = (Boolean) script.execute( Boolean status1 = (Boolean) script.execute(
"checkForReregister", PYTHON_INSTANCE, null); "checkForReregister", PYTHON_INSTANCE, null);
@ -302,23 +290,4 @@ public class GfeIRT extends Thread {
} }
} }
} }
/**
* Removes the site's entry from the shutdown hook map
*
* @param mhsid
* The MHS ID of the site
* @param siteid
* The Site ID of the site
*/
public void removeShutdownHook(String mhsid, String siteid) {
if (shutdownHooks.containsKey(mhsid + siteid)) {
Thread hook = shutdownHooks.get(mhsid + siteid);
try {
Runtime.getRuntime().removeShutdownHook(hook);
} catch (IllegalStateException e) {
// Ignore. Shutdown in progress
}
}
}
} }

View file

@ -0,0 +1,94 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.gfe.isc;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
/**
* An interface for beans that provide ISC services for
* {@code IscServiceProvider}. This bean, once registered with
* {@code IscServiceProvider} will run on only one of the available EDEX cluster
* nodes. The {@code IscServiceProvider} ensures that dependent services all run
* together on the same node.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Mar 11, 2015 dgilling Initial creation
*
* </pre>
*
* @author dgilling
* @version 1.0
*/
public interface IISCServiceBean {
/**
* Called upon activation of a new GFE site. Will only trigger on the
* cluster node currently hosting ISC services and if
* {@code gfeConfig.requestISC()} returns {@code true}.
*
* @param siteID
* Site identifier to activate ISC services for.
* @param gfeConfig
* Configuration data for this site.
*/
void activateSite(final String siteID, final IFPServerConfig gfeConfig);
/**
* Called upon deactivation of a GFE site. Will only trigger on the cluster
* node currently hosting ISC services.
*
* @param siteID
* Site identifier to deactivate ISC services for.
*/
void deactivateSite(final String siteID);
/**
* The startup method for this bean. Should be used to initialize heavy
* objects that shouldn't be allowed to run on both cluster nodes to
* conserve system resources.
*/
void startup();
/**
* Called to begin the shutdown process for this bean. Recommend using this
* method to cancel any asynchronous tasks this bean may be running.
* <p>
* Note that this method does not require that the startup method has
* previously been called so ensure this code does not rely on any behaviors
* of that method.
*/
void preShutdown();
/**
* Called after {@code IscServiceProvider} has completed its shutdown. One
* last chance to cleanup any resources in use by this bean.
* <p>
* Note that this method does not require that the startup method has
* previously been called so ensure this code does not rely on any behaviors
* of that method.
*/
void postShutdown();
}

View file

@ -20,15 +20,21 @@
package com.raytheon.edex.plugin.gfe.isc; package com.raytheon.edex.plugin.gfe.isc;
import java.lang.Thread.State; import java.util.ArrayList;
import java.util.Map; import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log; import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.LogFactory; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig; import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.uf.common.dataplugin.gfe.exception.GfeException; import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/** /**
* Manages interactions for the IRT server used with the GFE ISC capability * Manages interactions for the IRT server used with the GFE ISC capability
@ -41,89 +47,138 @@ import com.raytheon.uf.common.dataplugin.gfe.exception.GfeException;
* ------------ ---------- ----------- -------------------------- * ------------ ---------- ----------- --------------------------
* 08/10/09 1995 bphillip Initial creation * 08/10/09 1995 bphillip Initial creation
* 06/13/13 2044 randerso Refactored to use IFPServer * 06/13/13 2044 randerso Refactored to use IFPServer
* 03/11/15 4128 dgilling Refactored to use ISCServiceProvider.
* *
* </pre> * </pre>
* *
* @author bphillip * @author bphillip
* @version 1 * @version 1
*/ */
public class IRTManager { public final class IRTManager {
/** The logger */ /** The logger */
protected transient Log logger = LogFactory.getLog(getClass()); private final IUFStatusHandler statusHandler = UFStatus
.getHandler(IRTManager.class);
/** The singleton instance */
private static IRTManager instance;
/** Map of active IRT connections keyed by site */ /** Map of active IRT connections keyed by site */
private Map<String, GfeIRT> irtMap; private final ConcurrentMap<String, Future<?>> irtMap;
/** /** List of valid ISC sites. */
* Gets the singleton instance of the IRTManager private final Set<String> registeredSiteIDs;
*
* @return The singleton instance of the IRTManager
*/
public static synchronized IRTManager getInstance() {
if (instance == null) {
instance = new IRTManager();
}
return instance; private ExecutorService jobExecutor;
}
/** /**
* Constructs the singleton instance of the IRT Manager * Constructs the singleton instance of the IRT Manager
*/ */
private IRTManager() { public IRTManager() {
irtMap = new ConcurrentHashMap<String, GfeIRT>(); this.irtMap = new ConcurrentHashMap<>();
this.registeredSiteIDs = new CopyOnWriteArraySet<>();
} }
/** /**
* Enables ISC functionality for a site * Determines whether the specified site should continue to (re-) register
* with IRT.
* *
* @param siteID * @param siteID
* The site to activate ISC functionality for * Site identifier to check for.
* @param config * @return {@code true} if the site should continue registration with IRT.
* server configuration * {@code false} if not.
* @throws GfeException
* If the ISC functionality cannot be activated
*/ */
public void enableISC(String siteID, IFPServerConfig config) public boolean shouldRegister(final String siteID) {
throws GfeException { /*
* We use this separate Set to hold site IDs to avoid a race condition.
String mhsID = config.getMhsid(); * While it would be more convenient to use the keys of the irtMap to
if (!irtMap.containsKey(mhsID + "--" + siteID)) { * maintain the list of sites that should be attempting to register with
irtMap.put(mhsID + "--" + siteID, new GfeIRT(siteID, config)); * IRT, this will cause a race condition when the Runnable's attempt to
} * call this method. It's likely the job will hit the shouldRegister()
* check before the Future has been added to the Map and thus fail to
logger.info("Starting IRT registration thread for site [" + siteID * ever attempt registration with IRT.
+ "]"); */
irtMap.get(mhsID + "--" + siteID).start(); return registeredSiteIDs.contains(siteID);
} }
/** /**
* Disables ISC functionality for a site * Register the given site with the IRT server.
* *
* @param siteID * @param siteID
* The site to disable ISC functionality for * Site identifier for the site to register with the IRT server.
* @param gfeConfig
* The {@code IFPServerConfig} configuration data for the site.
*/ */
public void disableISC(String mhsID, String siteID) { public void activateSite(String siteID, IFPServerConfig gfeConfig) {
GfeIRT gfeIrt = null; if (!irtMap.containsKey(siteID)) {
String irtKey = mhsID + "--" + siteID; statusHandler.info("Starting IRT registration thread for site ["
gfeIrt = irtMap.remove(irtKey); + siteID + "]");
if (gfeIrt != null) {
if (gfeIrt.getState() != null) { registeredSiteIDs.add(siteID);
while (!gfeIrt.getState().equals(State.TERMINATED)) { Runnable job = constructJob(siteID, gfeConfig);
try {
Future<?> future = jobExecutor.submit(job);
irtMap.put(siteID, future);
} catch (RejectedExecutionException e) {
statusHandler.handle(Priority.PROBLEM,
"Unable to submit fetchAT job for execution:", e);
irtMap.remove(siteID);
} }
} }
// Remove the shutdown hook so an unregister is not attempted upon
// shutdown
gfeIrt.removeShutdownHook(mhsID, siteID);
}
} }
public boolean isRegistered(String mhsID, String siteID) { private GfeIRT constructJob(final String siteID,
boolean registered = irtMap.containsKey(mhsID + "--" + siteID); final IFPServerConfig config) {
return registered; return new GfeIRT(siteID, config, this);
}
/**
* Unregisters the given site with the IRT server.
*
* @param siteID
* Site identifier of the site to unregister.
*/
public void deactivateSite(String siteID) {
registeredSiteIDs.remove(siteID);
Future<?> job = irtMap.remove(siteID);
if (job != null) {
statusHandler.info("Deactivating IRT registration thread for "
+ siteID);
job.cancel(false);
}
}
/**
* Startup hook for this bean. Initializes job pool.
*/
public void preStart() {
statusHandler.info("Initializing IRTManager...");
jobExecutor = Executors.newCachedThreadPool();
}
/**
* Preliminary shutdown hook for this bean. Stops all running IRT
* registration jobs and initiates shutdown of the job pool.
*/
public void preStop() {
statusHandler.info("Shutting down IRTManager...");
Collection<String> siteIds = new ArrayList<>(registeredSiteIDs);
for (String siteId : siteIds) {
deactivateSite(siteId);
}
if (jobExecutor != null) {
jobExecutor.shutdown();
}
}
/**
* Shutdown completion hook for this bean. Clears all saved state for this
* bean.
*/
public void postStop() {
jobExecutor = null;
irtMap.clear();
registeredSiteIDs.clear();
} }
} }

View file

@ -0,0 +1,230 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.gfe.isc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.edex.core.IContextStateProcessor;
/**
* An {@code IContextStateProcessor} implementation for ISC beans. Ensures that
* all ISC services are running on the same cluster node together.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Mar 11, 2015 #4128 dgilling Initial creation
*
* </pre>
*
* @author dgilling
* @version 1.0
*/
public final class IscServiceProvider implements IContextStateProcessor {
private final IUFStatusHandler statusHandler = UFStatus
.getHandler(IscServiceProvider.class);
private volatile boolean activeInstance;
private final Collection<IISCServiceBean> iscBeans;
private final ExecutorService jobThreads;
private final IRTManager irtManager;
/**
* Default constructor.
*/
public IscServiceProvider() {
this.iscBeans = new ArrayList<>();
this.activeInstance = false;
this.jobThreads = Executors.newCachedThreadPool();
this.irtManager = new IRTManager();
}
/**
* Register a new {@code IISCServiceBean} instance with this class. Beans
* can only be added and never removed.
*
* @param iscServiceBean
* The bean to register.
* @return The bean that was registered.
*/
public IISCServiceBean addISCService(final IISCServiceBean iscServiceBean) {
iscBeans.add(iscServiceBean);
return iscServiceBean;
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#preStart()
*/
@Override
public void preStart() {
activeInstance = true;
irtManager.preStart();
for (IISCServiceBean iscBean : iscBeans) {
iscBean.startup();
}
for (IFPServer ifpServer : IFPServer.getActiveServers()) {
activateSite(ifpServer.getSiteId(), ifpServer.getConfig());
}
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#postStart()
*/
@Override
public void postStart() {
// no-op
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#preStop()
*/
@Override
public void preStop() {
irtManager.preStop();
for (IISCServiceBean iscBean : iscBeans) {
iscBean.preShutdown();
}
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#postStop()
*/
@Override
public void postStop() {
irtManager.postStop();
for (IISCServiceBean iscBean : iscBeans) {
iscBean.postShutdown();
}
}
/**
* Dummy trigger method for the timer in the camel context this bean
* monitors. Ensures this bean properly fails over between cluster members
* as needed.
*/
public void activateInstance() {
activeInstance = true;
}
/**
* Activate the ISC services for the given site. All registered beans will
* have their {@code activateSite} method called if this instance is running
* on the designated ISC cluster node.
*
* @param siteID
* Site identifier to activate ISC services for.
* @param config
* Configuration data for this site.
*/
public void activateSite(final String siteID, final IFPServerConfig config) {
statusHandler.info("Checking ISC configuration for site " + siteID);
if (config.requestISC()) {
if (activeInstance) {
statusHandler.info("Enabling ISC for site " + siteID);
irtManager.activateSite(siteID, config);
for (final IISCServiceBean bean : iscBeans) {
Runnable activationJob = new Runnable() {
@Override
public void run() {
try {
bean.activateSite(siteID, config);
} catch (Throwable t) {
statusHandler.error(
"Unhandled RuntimeException thrown while activating service "
+ bean.getClass()
+ " for site " + siteID, t);
}
}
};
jobThreads.submit(activationJob);
}
} else {
statusHandler
.info("ISC Enabled but will use another EDEX instance");
}
} else {
statusHandler.info("ISC is not enabled.");
}
}
/**
* Deactivates the ISC services for the given site. All registered beans
* will have their {@code deactivateSite} method called if this instance is
* running on the designated ISC cluster node.
*
* @param siteID
* Site identifier to deactivate ISC services for.
*/
public void deactivateSite(final String siteID) {
if (activeInstance) {
irtManager.deactivateSite(siteID);
for (final IISCServiceBean bean : iscBeans) {
Runnable deactivationJob = new Runnable() {
@Override
public void run() {
try {
bean.deactivateSite(siteID);
} catch (Throwable t) {
statusHandler.error(
"Unhandled RuntimeException thrown while deactivating service "
+ bean.getClass() + " for site "
+ siteID, t);
}
}
};
jobThreads.submit(deactivationJob);
}
}
}
}

View file

@ -0,0 +1,158 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.gfe.isc;
import java.util.HashMap;
import java.util.Map;
import jep.JepException;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.python.PyUtil;
import com.raytheon.uf.common.python.PythonScript;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.activetable.ActiveTablePyIncludeUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
/**
* Bean to run requestTCV at GFE site activation time.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Mar 11, 2015 #4128 dgilling Initial creation
*
* </pre>
*
* @author dgilling
* @version 1.0
*/
public final class RequestTCVSrv implements IISCServiceBean {
private final IUFStatusHandler statusHandler = UFStatus
.getHandler(RequestTCVSrv.class);
/*
* (non-Javadoc)
*
* @see
* com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#activateSite(java.lang
* .String, com.raytheon.edex.plugin.gfe.config.IFPServerConfig)
*/
@Override
public void activateSite(String siteID, IFPServerConfig gfeConfig) {
EDEXUtil.waitForRunning();
requestTCVFiles(siteID, gfeConfig);
}
private void requestTCVFiles(String siteId, IFPServerConfig config) {
statusHandler.info("Running requestTCV for site " + siteId);
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext commonBaseCx = pathMgr.getContext(
LocalizationType.COMMON_STATIC, LocalizationLevel.BASE);
String scriptPath = pathMgr.getFile(commonBaseCx,
FileUtil.join(ActiveTablePyIncludeUtil.VTEC, "requestTCV.py"))
.getPath();
String pythonIncludePath = PyUtil.buildJepIncludePath(
ActiveTablePyIncludeUtil.getCommonPythonIncludePath(),
ActiveTablePyIncludeUtil.getCommonGfeIncludePath(),
ActiveTablePyIncludeUtil.getVtecIncludePath(siteId),
ActiveTablePyIncludeUtil.getGfeConfigIncludePath(siteId),
ActiveTablePyIncludeUtil.getIscScriptsIncludePath());
PythonScript script = null;
try {
script = new PythonScript(scriptPath, pythonIncludePath, this
.getClass().getClassLoader());
try {
Map<String, Object> argMap = new HashMap<String, Object>();
argMap.put("siteID", siteId);
argMap.put("config", config);
script.execute("runFromJava", argMap);
} catch (JepException e) {
statusHandler.error("Error executing requestTCV.", e);
}
} catch (JepException e) {
statusHandler
.error("Unable to instantiate requestTCV python script object.",
e);
} finally {
if (script != null) {
script.dispose();
}
}
}
/*
* (non-Javadoc)
*
* @see
* com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#deactivateSite(java.
* lang.String)
*/
@Override
public void deactivateSite(String siteID) {
// no-op
}
/*
* (non-Javadoc)
*
* @see com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#startup()
*/
@Override
public void startup() {
// no-op
}
/*
* (non-Javadoc)
*
* @see com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#preShutdown()
*/
@Override
public void preShutdown() {
// no-op
}
/*
* (non-Javadoc)
*
* @see com.raytheon.edex.plugin.gfe.isc.IISCServiceBean#postShutdown()
*/
@Override
public void postShutdown() {
// no-op
}
}

View file

@ -30,7 +30,6 @@ import java.util.Set;
import com.raytheon.edex.plugin.gfe.cache.gridlocations.GridLocationCache; import com.raytheon.edex.plugin.gfe.cache.gridlocations.GridLocationCache;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig; import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.edex.plugin.gfe.db.dao.IscSendRecordDao; import com.raytheon.edex.plugin.gfe.db.dao.IscSendRecordDao;
import com.raytheon.edex.plugin.gfe.isc.IRTManager;
import com.raytheon.edex.plugin.gfe.reference.MapManager; import com.raytheon.edex.plugin.gfe.reference.MapManager;
import com.raytheon.edex.plugin.gfe.server.database.NetCDFDatabaseManager; import com.raytheon.edex.plugin.gfe.server.database.NetCDFDatabaseManager;
import com.raytheon.edex.plugin.gfe.server.database.TopoDatabaseManager; import com.raytheon.edex.plugin.gfe.server.database.TopoDatabaseManager;
@ -46,7 +45,6 @@ import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
import com.raytheon.uf.common.dataplugin.satellite.SatelliteRecord; import com.raytheon.uf.common.dataplugin.satellite.SatelliteRecord;
import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.database.DataAccessLayerException; import com.raytheon.uf.edex.database.DataAccessLayerException;
/** /**
@ -63,6 +61,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
* May 30, 2013 #2044 randerso Initial creation * May 30, 2013 #2044 randerso Initial creation
* Nov 20, 2013 #2331 randerso Added getTopoData method * Nov 20, 2013 #2331 randerso Added getTopoData method
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up * Oct 07, 2014 #3684 randerso Restructured IFPServer start up
* Mar 11, 2015 #4128 dgilling Remove unregister from IRT to IRTManager.
* *
* </pre> * </pre>
* *
@ -210,16 +209,11 @@ public class IFPServer {
} }
private void dispose() { private void dispose() {
if (config.requestISC()) {
IRTManager.getInstance().disableISC(config.getMhsid(), siteId);
}
try { try {
new IscSendRecordDao().deleteForSite(siteId); new IscSendRecordDao().deleteForSite(siteId);
} catch (DataAccessLayerException e) { } catch (DataAccessLayerException e) {
statusHandler.handle(Priority.PROBLEM, statusHandler.error("Could not clear IscSendRecords for site "
"Could not clear IscSendRecords for site " + siteId + siteId + " from queue.", e);
+ " from queue.", e);
} }
// TODO necessary? // TODO necessary?

View file

@ -1,275 +0,0 @@
#!/usr/bin/env python
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
from __future__ import with_statement
import getopt, sys, os, time, socket, threading, SocketServer, LogStream, select
import cPickle, stat, tempfile
import iscDataRec
from com.raytheon.edex.plugin.gfe.isc import IRTManager
#
# Port of msg_send script
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 07/06/09 1995 bphillip Initial Creation.
#
#
#
ISC_DATA_REC = "iscDataRec" # Assume it is on the PATH
class Server(object):
class Handler(SocketServer.StreamRequestHandler):
def handle(self):
func = cPickle.load(self.rfile)
print "func: ", func
if func == 'gettable':
with self.server.tlock:
msg = cPickle.dumps(self.server.table)
self.wfile.write(msg)
self.wfile.flush()
elif func == 'msg_send':
meta = cPickle.load(self.rfile)
print "meta: ", meta
nfiles = cPickle.load(self.rfile)
print "nfiles: ", nfiles
fnames = []
while nfiles > 0:
size = cPickle.load(self.rfile)
print "reading: ", size
fname = tempfile.mktemp()
fnames.append(fname)
fpout = open(fname, 'wb')
while size > 0:
buf = self.rfile.read(min(4096, size))
fpout.write(buf)
size = size - len(buf)
sys.stdout.write('.')
print "done size: ", size
fpout.close()
nfiles = nfiles - 1
iscDataRec.execIscDataRec(os.path.basename(fname), meta['subject'], fnames)
fnames.append(fname)
def __init__(self, wanid):
self.__shutdown = False
self.wanid = wanid
self.table = {}
self.tlock = threading.Lock()
self.tscan = time.time()
def discover(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", 10000))
poll_interval = 1.0
while not self.__shutdown:
r, w, e = select.select([s], [], [], poll_interval)
if r:
data, addr = s.recvfrom(1024, socket.MSG_WAITALL)
if not data:
break
data = cPickle.loads(data)
if data[0] == "tcpaddr":
with self.tlock:
self.table[data[1]] = (addr[0], data[2], data[3])
now = time.time()
if now - self.tscan > 15:
dlist = []
for k, v in self.table.iteritems():
if now - v[2] > 15:
dlist.append(k)
for k in dlist:
del self.table[k]
self.tscan = now
#print "received packet: ", self.table
def broadcast(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.bind(("<broadcast>", 10000))
while not self.__shutdown:
# msg type, wanid, host, port, time of update.
msg = cPickle.dumps(("tcpaddr", self.wanid, self.addr[1],
time.time()))
s.sendto(msg, socket.MSG_WAITALL,
("<broadcast>", 10000))
time.sleep(5)
def startThreads(self):
LogStream.logEvent("Starting Threads")
self.t1 = threading.Thread(target=self.broadcast)
self.t1.setDaemon(True)
self.t1.start()
self.t2 = threading.Thread(target=self.discover)
self.t2.setDaemon(True)
self.t2.start()
def run(self):
LogStream.logEvent("Running with ShutDown Value:", self.__shutdown)
tcps = SocketServer.TCPServer(('', 0), self.Handler)
tcps.table = self.table
tcps.tlock = self.tlock
self.addr = tcps.server_address
self.startThreads()
poll_interval = 2.0
while not self.__shutdown:
r, w, e = select.select([tcps], [], [], poll_interval)
if r:
tcps.handle_request()
if IRTManager.getInstance().isRegistered(self.wanid) == False:
LogStream.logEvent("Shutting Down GFE Socket Server for site [", self.wanid, "]...")
self.__shutdown = True
LogStream.logEvent("Stopping Broadcast thread for site [", self.wanid, "]...")
self.t1.join()
LogStream.logEvent("Stopping Discovery thread for site [", self.wanid, "]...")
self.t2.join()
LogStream.logEvent("GFE Socket Server for site [", self.wanid, "] shut down")
def serve(wanid):
s = Server(wanid)
s.run()
def gettable():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("<broadcast>", 10000))
data, addr = s.recvfrom(1024, socket.MSG_WAITALL)
if not data:
pass # something bad happened.
data = cPickle.loads(data)
s.close()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((addr[0], data[2]))
s.sendall(cPickle.dumps("gettable"), socket.MSG_WAITALL)
data = s.recv(1024, socket.MSG_WAITALL)
data = cPickle.loads(data)
return data
def msg_send(addrs, files, meta):
table = gettable()
for addr in addrs:
if table.has_key(addr):
host, port, time = table[addr]
print "send to: ", host, port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
s.sendall(cPickle.dumps("msg_send"), socket.MSG_WAITALL)
s.sendall(cPickle.dumps(meta), socket.MSG_WAITALL)
s.sendall(cPickle.dumps(len(files)), socket.MSG_WAITALL)
fp = s.makefile('rw')
for f in files:
size = os.stat(f)[stat.ST_SIZE]
print "sending: ", f, size, "(bytes)"
s.sendall(cPickle.dumps(size), socket.MSG_WAITALL)
fpin = open(f, 'rb')
buf = fpin.read(4096)
while buf != "":
fp.write(buf)
buf = fpin.read(4096)
fpin.close()
fp.flush()
else:
print "No table entry for", addr,"in table"
print table
# typical args /awips/ops/bin/msg_send -s %SUBJECT -a %ADDRESSES -i %WMOID -c 11 -p 0 -e %ATTACHMENTS
def msg_send_main(args):
optlist, args = getopt.getopt(args, 'a:e:s:i:c:p:')
addrs = []
files = []
meta = {}
for opt in optlist:
k, v = opt
if k == '-a':
addrs = v.split(',')
elif k == '-e':
files = v.split(',')
elif k == '-s':
meta['subject'] = v
elif k == '-i':
meta['wmoid'] = v
msg_send(addrs, files, meta)
def usage(status=0):
print """Usage:
mmhs --help (This message)
mmhs --server=WANID --iscdr=/path/to/iscDataRec
(Start server mode where WANID is awips wan id)
msg_send [args] (run in msg_send mode [args are just like msg_send])
"""
sys.exit(status)
def main():
global ISC_DATA_REC
mode = os.path.basename(sys.argv[0])
if mode == 'msg_send':
msg_send_main(sys.argv[1:])
sys.exit(0)
# server mode
try:
optlist, args = getopt.getopt(sys.argv[1:], '',
['server=', 'iscdr='])
except (getopt.GetoptError, getopt.error):
usage(1)
mhid = ""
msg_args = []
for opt in optlist:
k, v = opt
if k == '--server':
mhid = v
elif k == '--iscdr':
ISC_DATA_REC = v
elif k == '--help':
usage()
else:
usage(1)
serve(mhid)
#if __name__ == "__main__":
# main()