Omaha #4128: Move FetchActiveTableSrv to edex.gfe plugin, fix cluster failover of service.

Change-Id: I99132eb075f3ccf4dd8b679d61c992e7029e1a31

Former-commit-id: 63139c2f38 [formerly 5d962bf6cf7a3e40ee152bc98817daf116633f60]
Former-commit-id: d7c0000508
This commit is contained in:
David Gillingham 2015-02-26 15:41:49 -06:00
parent 99cc0e8764
commit cdb695e517
11 changed files with 403 additions and 508 deletions

View file

@ -27,10 +27,6 @@
<constructor-arg ref="gfeDbPluginProperties"/>
</bean>
<bean id="gfeSiteActivation" class="com.raytheon.edex.plugin.gfe.config.GFESiteActivation" factory-method="getInstance"
depends-on="commonTimeRegistered, gfeDbRegistered, levelFactoryInitialized">
</bean>
<bean id="ifpServer" class="com.raytheon.edex.plugin.gfe.server.IFPServer.Wrapper"/>
<camelContext id="gfe-common-camel" xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">

View file

@ -3,6 +3,11 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="gfeSiteActivation" class="com.raytheon.edex.plugin.gfe.config.GFESiteActivation"
depends-on="commonTimeRegistered, gfeDbRegistered, levelFactoryInitialized">
<constructor-arg ref="fetchATSrv" />
</bean>
<bean id="gfeSitesActiveRequest" factory-bean="siteAwareRegistry" factory-method="register">
<constructor-arg ref="gfeSiteActivation"/>
</bean>
@ -583,4 +588,23 @@
<bean factory-bean="contextManager" factory-method="registerClusteredContext">
<constructor-arg ref="clusteredGfeIscRoutes"/>
</bean>
<!-- Active Table Sharing Definitions -->
<bean id="fetchATSrv" class="com.raytheon.edex.plugin.gfe.isc.FetchActiveTableSrv" />
<bean factory-bean="contextManager" factory-method="registerContextStateProcessor">
<constructor-arg ref="activeTableSharingRoutes"/>
<constructor-arg ref="fetchATSrv"/>
</bean>
<camelContext id="activeTableSharingRoutes" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler">
<route id="activateFetchATSrv">
<from uri="timer://activateActiveTableSharing?repeatCount=1"/>
<bean ref="fetchATSrv" method="activateService"/>
</route>
</camelContext>
<bean factory-bean="contextManager" factory-method="registerClusteredContext">
<constructor-arg ref="activeTableSharingRoutes"/>
</bean>
</beans>

View file

@ -6,6 +6,9 @@
<constructor-arg value="smartInit" />
<constructor-arg value="smartInit.*" />
</bean>
<bean id="gfeSiteActivation" class="com.raytheon.edex.plugin.gfe.config.GFESiteActivation"
depends-on="commonTimeRegistered, gfeDbRegistered, levelFactoryInitialized" />
<bean id="smartInitQueue" class="com.raytheon.edex.plugin.gfe.smartinit.SmartInitQueue" factory-method="createQueue"/>

View file

@ -35,6 +35,7 @@ import jep.JepException;
import com.google.common.util.concurrent.MoreExecutors;
import com.raytheon.edex.plugin.gfe.exception.GfeConfigurationException;
import com.raytheon.edex.plugin.gfe.exception.GfeMissingConfigurationException;
import com.raytheon.edex.plugin.gfe.isc.FetchActiveTableSrv;
import com.raytheon.edex.plugin.gfe.isc.IRTManager;
import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.edex.site.SiteUtil;
@ -92,6 +93,7 @@ import com.raytheon.uf.edex.site.notify.SendSiteActivationNotifications;
* Sent activation failure message to alertViz
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up
* Dec 10, 2014 #4953 randerso Added requestTCVFiles call at site activation
* Feb 25, 2015 #4128 dgilling Simplify activation of active table sharing.
*
* </pre>
*
@ -109,25 +111,33 @@ public class GFESiteActivation implements ISiteActivationListener {
private static final int LOCK_TASK_TIMEOUT = 180000;
private static GFESiteActivation instance = new GFESiteActivation();
private boolean intialized;
private boolean intialized = false;
private final ExecutorService postActivationTaskExecutor;
private final ExecutorService postActivationTaskExecutor = MoreExecutors
.getExitingExecutorService((ThreadPoolExecutor) Executors
.newCachedThreadPool());
private final FetchActiveTableSrv fetchAtSrv;
/**
* @return the singleton instance
* Default constructor. Builds a GFESiteActivation instance with no
* associated {@code FetchActiveTableSrv} instance.
*/
public static GFESiteActivation getInstance() {
return instance;
public GFESiteActivation() {
this(null);
}
/**
* private constructor for singleton class
* Builds a GFESiteActivation instance with an associated
* {@code FetchActiveTableSrv} instance. Should only be used on request JVM.
*
* @param fetchAtSrv
* {@code FetchActiveTableSrv} instance
*/
private GFESiteActivation() {
public GFESiteActivation(final FetchActiveTableSrv fetchAtSrv) {
this.intialized = false;
this.postActivationTaskExecutor = MoreExecutors
.getExitingExecutorService((ThreadPoolExecutor) Executors
.newCachedThreadPool());
this.fetchAtSrv = fetchAtSrv;
}
@Override
@ -322,7 +332,6 @@ public class GFESiteActivation implements ISiteActivationListener {
// Doesn't need to be cluster locked
statusHandler.info("Checking ISC configuration...");
boolean isIscActivated = false;
if (config.requestISC()) {
String host = InetAddress.getLocalHost().getCanonicalHostName();
String gfeHost = config.getServerHost();
@ -337,14 +346,27 @@ public class GFESiteActivation implements ISiteActivationListener {
statusHandler.info("Enabling ISC...");
try {
IRTManager.getInstance().enableISC(siteID, config);
isIscActivated = true;
// wait until EDEX is up and running to request TCV files
final IFPServerConfig configRef = config;
if (configRef.tableFetchTime() > 0) {
Runnable activateTableSharing = new Runnable() {
@Override
public void run() {
EDEXUtil.waitForRunning();
fetchAtSrv.activateSite(siteID, configRef);
}
};
postActivationTaskExecutor.submit(activateTableSharing);
}
Runnable requestTCV = new Runnable() {
@Override
public void run() {
// wait until EDEX is up and running to request TCV
// files
EDEXUtil.waitForRunning();
requestTCVFiles(siteID, configRef);
@ -367,45 +389,6 @@ public class GFESiteActivation implements ISiteActivationListener {
statusHandler.info("ISC is not enabled.");
}
// doesn't need to be cluster locked
final IFPServerConfig configRef = config;
if ((config.tableFetchTime() > 0) && isIscActivated) {
Runnable activateFetchAT = new Runnable() {
@Override
public void run() {
EDEXUtil.waitForRunning();
Map<String, Object> fetchATConfig = new HashMap<String, Object>();
fetchATConfig.put("siteId", configRef.getSiteID().get(0));
fetchATConfig.put("interval", configRef.tableFetchTime());
fetchATConfig.put("ancf", configRef
.iscRoutingTableAddress().get("ANCF"));
fetchATConfig.put("bncf", configRef
.iscRoutingTableAddress().get("BNCF"));
fetchATConfig.put("serverHost", configRef.getServerHost());
fetchATConfig.put("port", configRef.getRpcPort());
fetchATConfig.put("protocolV",
configRef.getProtocolVersion());
fetchATConfig.put("mhsid", configRef.getMhsid());
fetchATConfig.put("transmitScript",
configRef.transmitScript());
try {
EDEXUtil.getMessageProducer().sendAsyncUri(
"jms-generic:queue:gfeSiteActivated",
fetchATConfig);
} catch (EdexException e) {
statusHandler.handle(Priority.PROBLEM,
"Could not activate active table sharing for site: "
+ siteID, e);
}
}
};
postActivationTaskExecutor.submit(activateFetchAT);
}
statusHandler.info("Adding " + siteID + " to active sites list.");
IFPServerConfigManager.addActiveSite(siteID);
statusHandler.info(siteID + " successfully activated");

View file

@ -0,0 +1,332 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.gfe.isc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.RunProcess;
import com.raytheon.uf.edex.core.IContextStateProcessor;
/**
* Service that fetches neighboring sites' active table entries that are
* relevant to this site using requestAT.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 28, 2013 dgilling Initial creation
* Feb 20, 2014 #2824 randerso Changed log level of message when activating FetchAT
* Registered with SiteAwareRegistry so we can stop
* fetching when site is deactivated.
* Feb 26, 2015 #4128 dgilling Moved to edex.gfe plugin, rewritten as
* IContextStateProcessor.
*
* </pre>
*
* @author dgilling
* @version 1.0
*/
public final class FetchActiveTableSrv implements IContextStateProcessor {
private static final class FetchATJobConfig {
private final String siteId;
private final long interval;
private final String ancfAddress;
private final String bncfAddress;
private final String serverHost;
private final String port;
private final String protocolV;
private final String mhsId;
private final String transmitScript;
public FetchATJobConfig(final String siteId,
final IFPServerConfig gfeConfig) {
this.siteId = siteId;
this.interval = gfeConfig.tableFetchTime();
this.ancfAddress = gfeConfig.iscRoutingTableAddress().get("ANCF");
this.bncfAddress = gfeConfig.iscRoutingTableAddress().get("BNCF");
this.serverHost = gfeConfig.getServerHost();
this.port = Long.toString(gfeConfig.getRpcPort());
this.protocolV = Long.toString(gfeConfig.getProtocolVersion());
this.mhsId = gfeConfig.getMhsid();
this.transmitScript = gfeConfig.transmitScript();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("FetchATJobConfig [siteId=");
builder.append(siteId);
builder.append(", interval=");
builder.append(interval);
builder.append(", ancfAddress=");
builder.append(ancfAddress);
builder.append(", bncfAddress=");
builder.append(bncfAddress);
builder.append(", serverHost=");
builder.append(serverHost);
builder.append(", port=");
builder.append(port);
builder.append(", protocolV=");
builder.append(protocolV);
builder.append(", mhsId=");
builder.append(mhsId);
builder.append(", transmitScript=");
builder.append(transmitScript);
builder.append("]");
return builder.toString();
}
public String getSiteId() {
return siteId;
}
public long getInterval() {
return interval;
}
public String getAncfAddress() {
return ancfAddress;
}
public String getBncfAddress() {
return bncfAddress;
}
public String getServerHost() {
return serverHost;
}
public String getPort() {
return port;
}
public String getProtocolV() {
return protocolV;
}
public String getMhsId() {
return mhsId;
}
public String getTransmitScript() {
return transmitScript;
}
}
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(FetchActiveTableSrv.class);
private final ConcurrentMap<String, ScheduledFuture<?>> siteJobInstanceMap;
private ScheduledExecutorService jobExecutor;
private volatile boolean activeServiceInstance;
/**
* Default constructor.
*/
public FetchActiveTableSrv() {
this.activeServiceInstance = false;
this.siteJobInstanceMap = new ConcurrentHashMap<String, ScheduledFuture<?>>();
}
/**
* Dummy trigger method for the timer in the camel context this bean
* monitors. Ensures this bean properly fails over between cluster members
* as needed.
*/
public void activateService() {
activeServiceInstance = true;
}
/**
* Removes a site's active table sharing job from the job pool.
*
* @param siteID
* Site identifier for the site's job to stop.
*/
public void deactivateSite(final String siteID) {
ScheduledFuture<?> siteJob = siteJobInstanceMap.remove(siteID);
if (siteJob != null) {
statusHandler.info("Deactivating FetchAT for " + siteID);
siteJob.cancel(false);
}
}
/**
* Adds a site's active table sharing job to the job pool.
*
* @param siteID
* Site identifier for the site's job to add to job pool.
* @param gfeConfig
* {@code IFPServerConfig} for the site.
*/
public void activateSite(final String siteID,
final IFPServerConfig gfeConfig) {
if (activeServiceInstance && (!siteJobInstanceMap.containsKey(siteID))) {
FetchATJobConfig jobConfig = new FetchATJobConfig(siteID, gfeConfig);
statusHandler.info("Activating FetchAT for " + siteID);
statusHandler.debug("Site: " + siteID + " config: " + jobConfig);
Runnable job = constructJob(jobConfig);
try {
ScheduledFuture<?> jobInstance = jobExecutor
.scheduleWithFixedDelay(job, 10,
jobConfig.getInterval(), TimeUnit.SECONDS);
siteJobInstanceMap.put(siteID, jobInstance);
} catch (RejectedExecutionException e) {
statusHandler.handle(Priority.PROBLEM,
"Unable to submit fetchAT job for execution:", e);
siteJobInstanceMap.remove(siteID);
}
}
}
private Runnable constructJob(final FetchATJobConfig jobConfig) {
Runnable job = new Runnable() {
@Override
public void run() {
statusHandler.info("Starting requestAT process for site: "
+ jobConfig.getSiteId());
/*
* requestAT -H ourHost -P ourPort -L ourServerProto -M mhsid -S
* ourSite -t irtWebAddr -x transmitScript
*/
List<String> args = new ArrayList<String>(17);
args.add("requestAT");
args.add("-H");
args.add(jobConfig.getServerHost());
args.add("-P");
args.add(jobConfig.getPort());
args.add("-L");
args.add(jobConfig.getProtocolV());
args.add("-M");
args.add(jobConfig.getMhsId());
args.add("-S");
args.add(jobConfig.getSiteId());
args.add("-a");
args.add(jobConfig.getAncfAddress());
args.add("-b");
args.add(jobConfig.getBncfAddress());
args.add("-x");
args.add(jobConfig.getTransmitScript());
try {
/*
* We'll wait for requestAT to finish execution so that we
* can't accidentally overlap running instances if the user
* configures the run interval too low.
*/
ProcessBuilder command = new ProcessBuilder(args);
RunProcess proc = RunProcess.getRunProcess();
proc.setProcess(command.start());
proc.waitFor();
} catch (Throwable t) {
statusHandler.error(
"Unhandled exception thrown during requestAT: ", t);
}
}
};
return job;
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#preStart()
*/
@Override
public void preStart() {
statusHandler.info("Initializing FetchATSrv...");
activeServiceInstance = true;
jobExecutor = Executors.newScheduledThreadPool(1);
for (IFPServer ifpServer : IFPServer.getActiveServers()) {
activateSite(ifpServer.getSiteId(), ifpServer.getConfig());
}
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#postStart()
*/
@Override
public void postStart() {
// no op
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#preStop()
*/
@Override
public void preStop() {
statusHandler.info("Shutting down FetchATSrv...");
activeServiceInstance = false;
jobExecutor.shutdown();
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#postStop()
*/
@Override
public void postStop() {
jobExecutor = null;
siteJobInstanceMap.clear();
}
}

View file

@ -29,7 +29,6 @@ import java.util.Map;
import jep.JepException;
import com.raytheon.edex.plugin.gfe.config.GFESiteActivation;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfigManager;
import com.raytheon.edex.plugin.gfe.exception.GfeConfigurationException;
@ -59,6 +58,8 @@ import com.raytheon.uf.edex.core.EdexTimerBasedThread;
* ------------ ---------- ----------- --------------------------
* Oct 20, 2011 dgilling Initial creation
* May 19, 2014 2726 rjpeter Integrate IscSendJob for graceful shutdown.
* Feb 26, 2015 4128 dgilling Switch to IFPServer.getActiveSites().
*
* </pre>
*
* @author dgilling
@ -128,8 +129,7 @@ public class SendIscSrv extends EdexTimerBasedThread {
String xmlDest = request.getXmlDest();
String siteId = id.getDbId().getSiteId();
if (!GFESiteActivation.getInstance().getActiveSites()
.contains(siteId)) {
if (!IFPServer.getActiveSites().contains(siteId)) {
statusHandler.warn("Attempted to send " + id
+ " for deactivated site " + siteId + ".");
return;

View file

@ -25,7 +25,7 @@ import java.util.Map;
import jep.JepException;
import com.raytheon.edex.plugin.gfe.config.GFESiteActivation;
import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.uf.common.dataplugin.gfe.python.GfePyIncludeUtil;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
@ -45,6 +45,7 @@ import com.raytheon.uf.common.python.PythonScript;
* ------------ ---------- ----------- --------------------------
* Jul 26, 2011 bphillip Initial creation
* Sep 05, 2013 #2307 dgilling Use better PythonScript constructor.
* Feb 26, 2015 #4128 dgilling Switch to IFPServer.getActiveSites().
*
* </pre>
*
@ -62,7 +63,7 @@ public class LogPurger {
public void purge() throws JepException {
for (String siteID : GFESiteActivation.getInstance().getActiveSites()) {
for (String siteID : IFPServer.getActiveSites()) {
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext cx = pathMgr.getContext(
LocalizationType.EDEX_STATIC, LocalizationLevel.BASE);

View file

@ -23,8 +23,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.raytheon.edex.plugin.gfe.config.GFESiteActivation;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfigManager;
import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.uf.common.dataplugin.gfe.request.GetSiteTimeZoneInfoRequest;
import com.raytheon.uf.common.dataplugin.gfe.server.message.ServerResponse;
import com.raytheon.uf.common.serialization.comm.IRequestHandler;
@ -39,6 +39,7 @@ import com.raytheon.uf.common.serialization.comm.IRequestHandler;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 19, 2011 dgilling Initial creation
* Feb 26, 2015 #4128 dgilling Switch to IFPServer.getActiveSites().
*
* </pre>
*
@ -61,7 +62,7 @@ public class GetSiteTimeZoneInfoRequestHandler implements
GetSiteTimeZoneInfoRequest request) throws Exception {
ServerResponse<Map<String, String>> sr = new ServerResponse<Map<String, String>>();
Set<String> sites = GFESiteActivation.getInstance().getActiveSites();
Set<String> sites = IFPServer.getActiveSites();
Map<String, String> siteWithTimeZone = new HashMap<String, String>();
for (String site : sites) {
// getTimeZones() seems to only ever return a 1 sized List

View file

@ -78,23 +78,4 @@
<constructor-arg value="com.raytheon.uf.common.activetable.request.UnlockAndSetNextEtnRequest"/>
<constructor-arg ref="setAndUnlockEtnHandler"/>
</bean>
<bean id="fetchATSrv" class="com.raytheon.uf.edex.activetable.vtecsharing.FetchActiveTableSrv"/>
<camelContext id="activeTableSharingRoutes" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler">
<route id="activeSiteForFetchATSrv">
<from uri="jms-generic:queue:gfeSiteActivated"/>
<doTry>
<bean ref="fetchATSrv" method="addSite"/>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:fetchATSrv?level=ERROR"/>
</doCatch>
</doTry>
</route>
</camelContext>
<bean factory-bean="contextManager" factory-method="registerClusteredContext">
<constructor-arg ref="activeTableSharingRoutes"/>
</bean>
</beans>

View file

@ -1,208 +0,0 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.activetable.vtecsharing;
import java.util.Map;
/**
* Configuration information for a given site's vtec active table fetching.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 28, 2013 dgilling Initial creation
*
* </pre>
*
* @author dgilling
* @version 1.0
*/
public class FetchATJobConfig {
private String siteId;
private long interval;
private String ancfAddress;
private String bncfAddress;
private String serverHost;
private String port;
private String protocolV;
private String mhsId;
private String transmitScript;
public FetchATJobConfig(Map<String, Object> configMap) {
siteId = configMap.get("siteId").toString();
interval = ((Number) configMap.get("interval")).longValue();
ancfAddress = configMap.get("ancf").toString();
bncfAddress = configMap.get("bncf").toString();
serverHost = configMap.get("serverHost").toString();
port = configMap.get("port").toString();
protocolV = configMap.get("protocolV").toString();
mhsId = configMap.get("mhsid").toString();
transmitScript = configMap.get("transmitScript").toString();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
FetchATJobConfig other = (FetchATJobConfig) obj;
if (ancfAddress == null) {
if (other.ancfAddress != null) {
return false;
}
} else if (!ancfAddress.equals(other.ancfAddress)) {
return false;
}
if (bncfAddress == null) {
if (other.bncfAddress != null) {
return false;
}
} else if (!bncfAddress.equals(other.bncfAddress)) {
return false;
}
if (interval != other.interval) {
return false;
}
if (mhsId == null) {
if (other.mhsId != null) {
return false;
}
} else if (!mhsId.equals(other.mhsId)) {
return false;
}
if (port == null) {
if (other.port != null) {
return false;
}
} else if (!port.equals(other.port)) {
return false;
}
if (protocolV == null) {
if (other.protocolV != null) {
return false;
}
} else if (!protocolV.equals(other.protocolV)) {
return false;
}
if (serverHost == null) {
if (other.serverHost != null) {
return false;
}
} else if (!serverHost.equals(other.serverHost)) {
return false;
}
if (siteId == null) {
if (other.siteId != null) {
return false;
}
} else if (!siteId.equals(other.siteId)) {
return false;
}
if (transmitScript == null) {
if (other.transmitScript != null) {
return false;
}
} else if (!transmitScript.equals(other.transmitScript)) {
return false;
}
return true;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("FetchATJobConfig [siteId=");
builder.append(siteId);
builder.append(", interval=");
builder.append(interval);
builder.append(", ancfAddress=");
builder.append(ancfAddress);
builder.append(", bncfAddress=");
builder.append(bncfAddress);
builder.append(", serverHost=");
builder.append(serverHost);
builder.append(", port=");
builder.append(port);
builder.append(", protocolV=");
builder.append(protocolV);
builder.append(", mhsId=");
builder.append(mhsId);
builder.append(", transmitScript=");
builder.append(transmitScript);
builder.append("]");
return builder.toString();
}
public String getSiteId() {
return siteId;
}
public long getInterval() {
return interval;
}
public String getAncfAddress() {
return ancfAddress;
}
public String getBncfAddress() {
return bncfAddress;
}
public String getServerHost() {
return serverHost;
}
public String getPort() {
return port;
}
public String getProtocolV() {
return protocolV;
}
public String getMhsId() {
return mhsId;
}
public String getTransmitScript() {
return transmitScript;
}
}

View file

@ -1,218 +0,0 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.activetable.vtecsharing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.MoreExecutors;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.registry.RegistryException;
import com.raytheon.uf.edex.site.ISiteActivationListener;
import com.raytheon.uf.edex.site.SiteAwareRegistry;
/**
* Service that fetches neighboring sites' active table entries that are
* relevant to this site using requestAT.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 28, 2013 dgilling Initial creation
* Feb 20, 2014 #2824 randerso Changed log level of message when activating FetchAT
* Registered with SiteAwareRegistry so we can stop
* fetching when site is deactivated.
*
* </pre>
*
* @author dgilling
* @version 1.0
*/
public class FetchActiveTableSrv implements ISiteActivationListener {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(FetchActiveTableSrv.class);
private final Map<String, FetchATJobConfig> siteConfigMap;
private final Map<String, ScheduledFuture<?>> siteJobInstanceMap;
private final ScheduledExecutorService jobExecutor;
public FetchActiveTableSrv() {
siteConfigMap = new ConcurrentHashMap<String, FetchATJobConfig>();
siteJobInstanceMap = new ConcurrentHashMap<String, ScheduledFuture<?>>();
jobExecutor = MoreExecutors
.getExitingScheduledExecutorService((ScheduledThreadPoolExecutor) Executors
.newScheduledThreadPool(1));
try {
SiteAwareRegistry.getInstance().register(this);
} catch (RegistryException e) {
statusHandler.handle(Priority.PROBLEM,
"Error registering with SiteAwareRegistry", e);
}
}
public void addSite(Map<String, Object> configData) {
FetchATJobConfig config = new FetchATJobConfig(configData);
final String site = config.getSiteId();
statusHandler.info("Activating FetchAT for " + site);
statusHandler.debug("Site: " + site + " config: " + config);
if ((siteConfigMap.containsKey(site))
&& siteConfigMap.get(site).equals(config)) {
return;
}
Runnable job = new Runnable() {
@Override
public void run() {
statusHandler.info("Starting requestAT process for site: "
+ site);
// requestAT -H ourHost -P ourPort -L ourServerProto -M mhsid
// -S ourSite -t irtWebAddr -x transmitScript
FetchATJobConfig jobConfig = siteConfigMap.get(site);
List<String> args = new ArrayList<String>(17);
args.add("requestAT");
args.add("-H");
args.add(jobConfig.getServerHost());
args.add("-P");
args.add(jobConfig.getPort());
args.add("-L");
args.add(jobConfig.getProtocolV());
args.add("-M");
args.add(jobConfig.getMhsId());
args.add("-S");
args.add(jobConfig.getSiteId());
args.add("-a");
args.add(jobConfig.getAncfAddress());
args.add("-b");
args.add(jobConfig.getBncfAddress());
args.add("-x");
args.add(jobConfig.getTransmitScript());
// String msg = Joiner.on(' ').join(args);
// statusHandler.debug("Running command: " + msg);
try {
ProcessBuilder command = new ProcessBuilder(args);
command.start();
} catch (IOException e) {
statusHandler.handle(Priority.PROBLEM,
"Error executing requestAT: ", e);
}
}
};
try {
siteConfigMap.put(site, config);
ScheduledFuture<?> jobInstance = jobExecutor.scheduleAtFixedRate(
job, 10, config.getInterval(), TimeUnit.SECONDS);
siteJobInstanceMap.put(site, jobInstance);
} catch (RejectedExecutionException e) {
statusHandler.handle(Priority.PROBLEM,
"Unable to submit fetchAT job for execution:", e);
siteConfigMap.remove(site);
siteJobInstanceMap.remove(site);
}
}
/*
* (non-Javadoc)
*
* @see
* com.raytheon.uf.edex.site.ISiteActivationListener#deactivateSite(java
* .lang.String)
*/
@Override
public void deactivateSite(String siteID) throws Exception {
ScheduledFuture<?> siteJob = siteJobInstanceMap.remove(siteID);
if (siteJob != null) {
statusHandler.info("Deactivating FetchAT for " + siteID);
siteJob.cancel(false);
}
siteConfigMap.remove(siteID);
}
/*
* (non-Javadoc)
*
* @see
* com.raytheon.uf.edex.site.ISiteActivationListener#activateSite(java.lang
* .String)
*/
@Override
public void activateSite(String siteID) throws Exception {
return;
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.site.ISiteActivationListener#getActiveSites()
*/
@Override
public Set<String> getActiveSites() {
return Collections.emptySet();
}
/*
* (non-Javadoc)
*
* @see
* com.raytheon.uf.edex.site.ISiteActivationListener#validateConfig(java
* .lang.String)
*/
@Override
public String validateConfig(String siteID) {
return "";
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.site.ISiteActivationListener#registered()
*/
@Override
public void registered() {
return;
}
}