Issue #2191 Addressing comments from previous change
Change-Id: Ib7e23bdfbecebe7141ef2d086d7b049530d11ce2 Former-commit-id:3c7ad386ed
[formerlya7258dd8f6
[formerly59a9aeabf0
] [formerly3c7ad386ed
[formerly c2d5beba2c9656d80851aee1a7beabe4807c14f6]]] Former-commit-id:a7258dd8f6
[formerly59a9aeabf0
] Former-commit-id:a7258dd8f6
Former-commit-id:00cb6b91da
This commit is contained in:
parent
d221aed878
commit
fd0252c440
12 changed files with 167 additions and 119 deletions
|
@ -8,7 +8,7 @@
|
||||||
<constructor-arg ref="ebxmlFederationEnabled"/>
|
<constructor-arg ref="ebxmlFederationEnabled"/>
|
||||||
<constructor-arg ref="lcmServiceImpl" />
|
<constructor-arg ref="lcmServiceImpl" />
|
||||||
<constructor-arg value="ebxml/federation/ncfFederationConfig.xml" />
|
<constructor-arg value="ebxml/federation/ncfFederationConfig.xml" />
|
||||||
<property name="replicationManager" ref="RegistryReplicationManager"/>
|
<constructor-arg ref="RegistryReplicationManager"/>
|
||||||
<property name="registryObjectDao" ref="registryObjectDao"/>
|
<property name="registryObjectDao" ref="registryObjectDao"/>
|
||||||
<property name="registryDao" ref="registryDao"/>
|
<property name="registryDao" ref="registryDao"/>
|
||||||
</bean>
|
</bean>
|
||||||
|
|
|
@ -8,8 +8,8 @@
|
||||||
<constructor-arg ref="ebxmlFederationEnabled"/>
|
<constructor-arg ref="ebxmlFederationEnabled"/>
|
||||||
<constructor-arg ref="lcmServiceImpl" />
|
<constructor-arg ref="lcmServiceImpl" />
|
||||||
<constructor-arg value="ebxml/federation/federationConfig.xml"/>
|
<constructor-arg value="ebxml/federation/federationConfig.xml"/>
|
||||||
|
<constructor-arg ref="RegistryReplicationManager"/>
|
||||||
<constructor-arg value="${NCF_ADDRESS}" />
|
<constructor-arg value="${NCF_ADDRESS}" />
|
||||||
<property name="replicationManager" ref="RegistryReplicationManager"/>
|
|
||||||
<property name="registryObjectDao" ref="registryObjectDao"/>
|
<property name="registryObjectDao" ref="registryObjectDao"/>
|
||||||
<property name="registryDao" ref="registryDao"/>
|
<property name="registryDao" ref="registryDao"/>
|
||||||
<property name="txTemplate" ref="metadataTxTemplate"/>
|
<property name="txTemplate" ref="metadataTxTemplate"/>
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
<constructor-arg ref="registryObjectDao"/>
|
<constructor-arg ref="registryObjectDao"/>
|
||||||
<constructor-arg ref="FederatedRegistryMonitor"/>
|
<constructor-arg ref="FederatedRegistryMonitor"/>
|
||||||
<constructor-arg ref="metadataTxTemplate"/>
|
<constructor-arg ref="metadataTxTemplate"/>
|
||||||
|
<constructor-arg value="${ebxml-federation-sync-threads}"/>
|
||||||
</bean>
|
</bean>
|
||||||
|
|
||||||
<bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
|
<bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
|
||||||
|
|
|
@ -115,14 +115,15 @@ public class FederatedRegistryMonitor extends RunnableWithTransaction {
|
||||||
regObj.setVersionInfo(new VersionInfoType());
|
regObj.setVersionInfo(new VersionInfoType());
|
||||||
regObj.setStatus(StatusTypes.APPROVED);
|
regObj.setStatus(StatusTypes.APPROVED);
|
||||||
regObj.getSlot().add(slot);
|
regObj.getSlot().add(slot);
|
||||||
|
registryObjectDao.create(regObj);
|
||||||
} else {
|
} else {
|
||||||
DateTimeValueType dateTime = (DateTimeValueType) regObj
|
DateTimeValueType dateTime = (DateTimeValueType) regObj
|
||||||
.getSlotByName(REGISTRY_AVAILABLE_ID).getSlotValue();
|
.getSlotByName(REGISTRY_AVAILABLE_ID).getSlotValue();
|
||||||
dateTime.setDateTimeValue(EbxmlObjectUtil
|
dateTime.setDateTimeValue(EbxmlObjectUtil
|
||||||
.getCurrentTimeAsXMLGregorianCalendar());
|
.getCurrentTimeAsXMLGregorianCalendar());
|
||||||
|
registryObjectDao.update(regObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
registryObjectDao.createOrUpdate(regObj);
|
|
||||||
} catch (EbxmlRegistryException e) {
|
} catch (EbxmlRegistryException e) {
|
||||||
statusHandler.error("Error updating federated time!", e);
|
statusHandler.error("Error updating federated time!", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import java.util.List;
|
||||||
import javax.xml.bind.JAXBException;
|
import javax.xml.bind.JAXBException;
|
||||||
|
|
||||||
import oasis.names.tc.ebxml.regrep.wsdl.registry.services.v4.LifecycleManager;
|
import oasis.names.tc.ebxml.regrep.wsdl.registry.services.v4.LifecycleManager;
|
||||||
import oasis.names.tc.ebxml.regrep.wsdl.registry.services.v4.MsgRegistryException;
|
|
||||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.AssociationType;
|
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.AssociationType;
|
||||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.FederationType;
|
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.FederationType;
|
||||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.OrganizationType;
|
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.OrganizationType;
|
||||||
|
@ -38,6 +37,7 @@ import com.raytheon.uf.common.registry.constants.RegistryObjectTypes;
|
||||||
import com.raytheon.uf.common.registry.constants.StatusTypes;
|
import com.raytheon.uf.common.registry.constants.StatusTypes;
|
||||||
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
|
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
|
||||||
import com.raytheon.uf.common.serialization.SerializationException;
|
import com.raytheon.uf.common.serialization.SerializationException;
|
||||||
|
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistryReplicationManager;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
|
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
|
||||||
|
|
||||||
|
@ -84,9 +84,11 @@ public class NcfRegistryFederationManager extends RegistryFederationManager
|
||||||
* If errors occur while deserializing the federation properties
|
* If errors occur while deserializing the federation properties
|
||||||
*/
|
*/
|
||||||
protected NcfRegistryFederationManager(boolean federationEnabled,
|
protected NcfRegistryFederationManager(boolean federationEnabled,
|
||||||
LifecycleManager lcm, String federationPropertiesFileName)
|
LifecycleManager lcm, String federationPropertiesFileName,
|
||||||
|
RegistryReplicationManager replicationManager)
|
||||||
throws JAXBException, IOException, SerializationException {
|
throws JAXBException, IOException, SerializationException {
|
||||||
super(federationEnabled, lcm, federationPropertiesFileName);
|
super(federationEnabled, lcm, federationPropertiesFileName,
|
||||||
|
replicationManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -110,8 +112,8 @@ public class NcfRegistryFederationManager extends RegistryFederationManager
|
||||||
replicationManager.submitRemoteSubscriptions(registry);
|
replicationManager.submitRemoteSubscriptions(registry);
|
||||||
try {
|
try {
|
||||||
replicationManager.checkDownTime();
|
replicationManager.checkDownTime();
|
||||||
} catch (MsgRegistryException e) {
|
} catch (Exception e) {
|
||||||
throw new EbxmlRegistryException("Error checkint down time!", e);
|
throw new EbxmlRegistryException("Error checking down time!", e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
statusHandler.info("Federation is disabled for this registry.");
|
statusHandler.info("Federation is disabled for this registry.");
|
||||||
|
|
|
@ -47,6 +47,7 @@ import com.raytheon.uf.common.serialization.JAXBManager;
|
||||||
import com.raytheon.uf.common.serialization.SerializationException;
|
import com.raytheon.uf.common.serialization.SerializationException;
|
||||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||||
import com.raytheon.uf.common.status.UFStatus;
|
import com.raytheon.uf.common.status.UFStatus;
|
||||||
|
import com.raytheon.uf.common.util.CollectionUtil;
|
||||||
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistryReplicationManager;
|
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistryReplicationManager;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao;
|
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
|
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
|
||||||
|
@ -141,10 +142,12 @@ public abstract class RegistryFederationManager {
|
||||||
* If errors occur when unmarshalling the federation properties
|
* If errors occur when unmarshalling the federation properties
|
||||||
*/
|
*/
|
||||||
protected RegistryFederationManager(boolean federationEnabled,
|
protected RegistryFederationManager(boolean federationEnabled,
|
||||||
LifecycleManager lcm, String federationPropertiesFileName)
|
LifecycleManager lcm, String federationPropertiesFileName,
|
||||||
|
RegistryReplicationManager replicationManager)
|
||||||
throws JAXBException, SerializationException {
|
throws JAXBException, SerializationException {
|
||||||
this.federationEnabled = federationEnabled;
|
this.federationEnabled = federationEnabled;
|
||||||
this.lcm = lcm;
|
this.lcm = lcm;
|
||||||
|
this.replicationManager = replicationManager;
|
||||||
jaxbManager = new JAXBManager(SubmitObjectsRequest.class,
|
jaxbManager = new JAXBManager(SubmitObjectsRequest.class,
|
||||||
FederationProperties.class);
|
FederationProperties.class);
|
||||||
File federationPropertiesFile = PathManagerFactory.getPathManager()
|
File federationPropertiesFile = PathManagerFactory.getPathManager()
|
||||||
|
@ -160,6 +163,14 @@ public abstract class RegistryFederationManager {
|
||||||
federationProperties = (FederationProperties) jaxbManager
|
federationProperties = (FederationProperties) jaxbManager
|
||||||
.jaxbUnmarshalFromXmlFile(federationPropertiesFile);
|
.jaxbUnmarshalFromXmlFile(federationPropertiesFile);
|
||||||
}
|
}
|
||||||
|
if (this.replicationManager.getServers() == null
|
||||||
|
|| CollectionUtil.isNullOrEmpty(replicationManager
|
||||||
|
.getServers().getRegistryReplicationServers())) {
|
||||||
|
statusHandler
|
||||||
|
.warn("No servers configured for replication. Federation functionality is disabled");
|
||||||
|
this.federationEnabled = false;
|
||||||
|
this.replicationManager.setSubscriptionProcessingEnabled(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -211,11 +222,6 @@ public abstract class RegistryFederationManager {
|
||||||
this.registryObjectDao = registryObjectDao;
|
this.registryObjectDao = registryObjectDao;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setReplicationManager(
|
|
||||||
RegistryReplicationManager replicationManager) {
|
|
||||||
this.replicationManager = replicationManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setRegistryDao(RegistryDao registryDao) {
|
public void setRegistryDao(RegistryDao registryDao) {
|
||||||
this.registryDao = registryDao;
|
this.registryDao = registryDao;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@ import com.raytheon.uf.common.registry.services.RegistryRESTServices;
|
||||||
import com.raytheon.uf.common.registry.services.RegistrySOAPServices;
|
import com.raytheon.uf.common.registry.services.RegistrySOAPServices;
|
||||||
import com.raytheon.uf.common.serialization.SerializationException;
|
import com.raytheon.uf.common.serialization.SerializationException;
|
||||||
import com.raytheon.uf.edex.database.RunnableWithTransaction;
|
import com.raytheon.uf.edex.database.RunnableWithTransaction;
|
||||||
|
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistryReplicationManager;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
|
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
|
||||||
|
|
||||||
|
@ -111,9 +112,10 @@ public class WfoRegistryFederationManager extends RegistryFederationManager
|
||||||
*/
|
*/
|
||||||
protected WfoRegistryFederationManager(boolean federationEnabled,
|
protected WfoRegistryFederationManager(boolean federationEnabled,
|
||||||
LifecycleManager lcm, String federationPropertiesFileName,
|
LifecycleManager lcm, String federationPropertiesFileName,
|
||||||
String ncfAddress) throws JAXBException, IOException,
|
RegistryReplicationManager replicationManager, String ncfAddress)
|
||||||
SerializationException {
|
throws JAXBException, IOException, SerializationException {
|
||||||
super(federationEnabled, lcm, federationPropertiesFileName);
|
super(federationEnabled, lcm, federationPropertiesFileName,
|
||||||
|
replicationManager);
|
||||||
this.ncfAddress = ncfAddress;
|
this.ncfAddress = ncfAddress;
|
||||||
scheduler = Executors.newSingleThreadScheduledExecutor();
|
scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,6 +138,15 @@ public class RegistryReplicationManager {
|
||||||
/** Object types to automatically create subscriptions for */
|
/** Object types to automatically create subscriptions for */
|
||||||
private static List<String> objectTypes = new ArrayList<String>();
|
private static List<String> objectTypes = new ArrayList<String>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When a federation sync is necessary, this is the number of threads that
|
||||||
|
* will be used for synchronization. Configurable in the
|
||||||
|
* com.raytheon.uf.edex.registry.ebxml.properties file. Default is 25
|
||||||
|
*/
|
||||||
|
private int registrySyncThreads = 25;
|
||||||
|
|
||||||
|
private int maxSyncRetries = 3;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new RegistryReplicationManager
|
* Creates a new RegistryReplicationManager
|
||||||
*
|
*
|
||||||
|
@ -152,13 +161,14 @@ public class RegistryReplicationManager {
|
||||||
public RegistryReplicationManager(boolean subscriptionProcessingEnabled,
|
public RegistryReplicationManager(boolean subscriptionProcessingEnabled,
|
||||||
String notificationServerConfigFileName, RegistryObjectDao dao,
|
String notificationServerConfigFileName, RegistryObjectDao dao,
|
||||||
FederatedRegistryMonitor availabilityMonitor,
|
FederatedRegistryMonitor availabilityMonitor,
|
||||||
TransactionTemplate txTemplate) throws JAXBException,
|
TransactionTemplate txTemplate, int registrySyncThreads)
|
||||||
SerializationException {
|
throws JAXBException, SerializationException {
|
||||||
this.subscriptionProcessingEnabled = subscriptionProcessingEnabled;
|
this.subscriptionProcessingEnabled = subscriptionProcessingEnabled;
|
||||||
this.replicationConfigFileName = notificationServerConfigFileName;
|
this.replicationConfigFileName = notificationServerConfigFileName;
|
||||||
this.dao = dao;
|
this.dao = dao;
|
||||||
this.txTemplate = txTemplate;
|
this.txTemplate = txTemplate;
|
||||||
this.federatedRegistryMonitor = availabilityMonitor;
|
this.federatedRegistryMonitor = availabilityMonitor;
|
||||||
|
this.registrySyncThreads = registrySyncThreads;
|
||||||
jaxbManager = new JAXBManager(NotificationServers.class,
|
jaxbManager = new JAXBManager(NotificationServers.class,
|
||||||
SubscriptionType.class);
|
SubscriptionType.class);
|
||||||
File notificationServerConfigFile = PathManagerFactory.getPathManager()
|
File notificationServerConfigFile = PathManagerFactory.getPathManager()
|
||||||
|
@ -180,13 +190,9 @@ public class RegistryReplicationManager {
|
||||||
* for over 2 days, the registry is synchronized with one of the federation
|
* for over 2 days, the registry is synchronized with one of the federation
|
||||||
* members
|
* members
|
||||||
*
|
*
|
||||||
* @throws MsgRegistryException
|
* @throws Exception
|
||||||
* If errors occur during registry synchronization
|
|
||||||
* @throws EbxmlRegistryException
|
|
||||||
* If errors occur during registry synchronization
|
|
||||||
*/
|
*/
|
||||||
public void checkDownTime() throws MsgRegistryException,
|
public void checkDownTime() throws Exception {
|
||||||
EbxmlRegistryException {
|
|
||||||
long currentTime = TimeUtil.currentTimeMillis();
|
long currentTime = TimeUtil.currentTimeMillis();
|
||||||
long lastKnownUp = federatedRegistryMonitor.getLastKnownUptime();
|
long lastKnownUp = federatedRegistryMonitor.getLastKnownUptime();
|
||||||
long downTime = currentTime - lastKnownUp;
|
long downTime = currentTime - lastKnownUp;
|
||||||
|
@ -199,51 +205,77 @@ public class RegistryReplicationManager {
|
||||||
// synchronization of the
|
// synchronization of the
|
||||||
// data from the federation
|
// data from the federation
|
||||||
if (currentTime - lastKnownUp > MAX_DOWN_TIME_DURATION) {
|
if (currentTime - lastKnownUp > MAX_DOWN_TIME_DURATION) {
|
||||||
statusHandler
|
int syncAttempt = 1;
|
||||||
.warn("Registry has been down for ~2 days. Initiating federated data synchronization....");
|
for (; syncAttempt <= maxSyncRetries; syncAttempt++) {
|
||||||
List<NotificationHostConfiguration> notificationServers = servers
|
try {
|
||||||
.getRegistryReplicationServers();
|
|
||||||
if (servers == null
|
|
||||||
|| CollectionUtil.isNullOrEmpty(servers
|
|
||||||
.getRegistryReplicationServers())) {
|
|
||||||
statusHandler
|
|
||||||
.warn("No servers configured for replication. Unable to synchronize data with federation!");
|
|
||||||
} else {
|
|
||||||
NotificationHostConfiguration registryToSyncFrom = null;
|
|
||||||
for (NotificationHostConfiguration config : notificationServers) {
|
|
||||||
statusHandler.info("Checking availability of registry at: "
|
|
||||||
+ config.getRegistryBaseURL());
|
|
||||||
if (RegistryRESTServices.isRegistryAvailable(config
|
|
||||||
.getRegistryBaseURL())) {
|
|
||||||
registryToSyncFrom = config;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
statusHandler.info("Registry at "
|
|
||||||
+ config.getRegistryBaseURL()
|
|
||||||
+ " is not available...");
|
|
||||||
}
|
|
||||||
|
|
||||||
// No available registry was found!
|
|
||||||
if (registryToSyncFrom == null) {
|
|
||||||
statusHandler
|
statusHandler
|
||||||
.warn("No available registries found! Registry data will not be synchronized with the federation!");
|
.warn("Registry has been down for ~2 days. Initiating federated registry data synchronization attempt #"
|
||||||
} else {
|
+ syncAttempt
|
||||||
synchronizeRegistryWithFederation(registryToSyncFrom
|
+ "/"
|
||||||
.getRegistryBaseURL());
|
+ maxSyncRetries
|
||||||
|
+ "...");
|
||||||
|
List<NotificationHostConfiguration> notificationServers = servers
|
||||||
|
.getRegistryReplicationServers();
|
||||||
|
if (servers == null
|
||||||
|
|| CollectionUtil.isNullOrEmpty(servers
|
||||||
|
.getRegistryReplicationServers())) {
|
||||||
|
statusHandler
|
||||||
|
.error("No servers configured for replication. Unable to synchronize registry data with federation!");
|
||||||
|
} else {
|
||||||
|
NotificationHostConfiguration registryToSyncFrom = null;
|
||||||
|
for (NotificationHostConfiguration config : notificationServers) {
|
||||||
|
statusHandler
|
||||||
|
.info("Checking availability of registry at: "
|
||||||
|
+ config.getRegistryBaseURL());
|
||||||
|
if (RegistryRESTServices.isRegistryAvailable(config
|
||||||
|
.getRegistryBaseURL())) {
|
||||||
|
registryToSyncFrom = config;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
statusHandler.info("Registry at "
|
||||||
|
+ config.getRegistryBaseURL()
|
||||||
|
+ " is not available...");
|
||||||
|
}
|
||||||
|
|
||||||
|
// No available registry was found!
|
||||||
|
if (registryToSyncFrom == null) {
|
||||||
|
throw new EbxmlRegistryException(
|
||||||
|
"No available registries found! Registry data will not be synchronized with the federation!");
|
||||||
|
} else {
|
||||||
|
synchronizeRegistryWithFederation(registryToSyncFrom
|
||||||
|
.getRegistryBaseURL());
|
||||||
|
|
||||||
|
statusHandler
|
||||||
|
.info("Starting federated uptime monitor...");
|
||||||
|
scheduler.scheduleAtFixedRate(
|
||||||
|
federatedRegistryMonitor, 0, 1,
|
||||||
|
TimeUnit.MINUTES);
|
||||||
|
// Sync was successful, break out of retry loop
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (syncAttempt < maxSyncRetries) {
|
||||||
|
statusHandler.error(
|
||||||
|
"Federation registry data synchronization attempt #"
|
||||||
|
+ syncAttempt + "/" + maxSyncRetries
|
||||||
|
+ " failed! Retrying...", e);
|
||||||
|
} else {
|
||||||
|
statusHandler
|
||||||
|
.fatal("Federation registry data synchronization has failed",
|
||||||
|
e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
statusHandler.info("Starting federated uptime monitor...");
|
|
||||||
scheduler.scheduleAtFixedRate(federatedRegistryMonitor, 0, 1,
|
|
||||||
TimeUnit.MINUTES);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void synchronizeRegistryWithFederation(String remoteRegistryUrl)
|
private void synchronizeRegistryWithFederation(String remoteRegistryUrl)
|
||||||
throws MsgRegistryException, EbxmlRegistryException {
|
throws MsgRegistryException, EbxmlRegistryException {
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(25);
|
ExecutorService executor = Executors
|
||||||
|
.newFixedThreadPool(this.registrySyncThreads);
|
||||||
for (String objectType : objectTypes) {
|
for (String objectType : objectTypes) {
|
||||||
Set<String> localIds = new HashSet<String>();
|
Set<String> localIds = new HashSet<String>();
|
||||||
Set<String> remoteIds = new HashSet<String>();
|
Set<String> remoteIds = new HashSet<String>();
|
||||||
|
@ -279,23 +311,8 @@ public class RegistryReplicationManager {
|
||||||
*/
|
*/
|
||||||
for (String localId : localIds) {
|
for (String localId : localIds) {
|
||||||
if (remoteIds.contains(localId)) {
|
if (remoteIds.contains(localId)) {
|
||||||
RegistryObjectType objectToSubmit;
|
executor.submit(new RegistrySubmitTask(txTemplate, dao,
|
||||||
try {
|
localId, remoteRegistryUrl));
|
||||||
objectToSubmit = RegistryRESTServices
|
|
||||||
.getRegistryObject(RegistryObjectType.class,
|
|
||||||
remoteRegistryUrl,
|
|
||||||
localId.replaceAll(":", "%3A")
|
|
||||||
.replaceAll("\\/", "%2F"));
|
|
||||||
} catch (Exception e) {
|
|
||||||
statusHandler.error("Error getting remote object: "
|
|
||||||
+ localId, e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
objectToSubmit.addSlot(EbxmlObjectUtil.HOME_SLOT_NAME,
|
|
||||||
remoteRegistryUrl);
|
|
||||||
RegistrySubmitTask submitTask = new RegistrySubmitTask(
|
|
||||||
txTemplate, dao, objectToSubmit, remoteRegistryUrl);
|
|
||||||
executor.submit(submitTask);
|
|
||||||
} else {
|
} else {
|
||||||
RegistryRemoveTask removeTask = new RegistryRemoveTask(
|
RegistryRemoveTask removeTask = new RegistryRemoveTask(
|
||||||
txTemplate, dao, localId);
|
txTemplate, dao, localId);
|
||||||
|
@ -310,23 +327,8 @@ public class RegistryReplicationManager {
|
||||||
*/
|
*/
|
||||||
for (String remoteId : remoteIds) {
|
for (String remoteId : remoteIds) {
|
||||||
if (!localIds.contains(remoteId)) {
|
if (!localIds.contains(remoteId)) {
|
||||||
RegistryObjectType objectToSubmit;
|
executor.submit(new RegistrySubmitTask(txTemplate, dao,
|
||||||
try {
|
remoteId, remoteRegistryUrl));
|
||||||
objectToSubmit = RegistryRESTServices
|
|
||||||
.getRegistryObject(RegistryObjectType.class,
|
|
||||||
remoteRegistryUrl,
|
|
||||||
remoteId.replaceAll(":", "%3A")
|
|
||||||
.replaceAll("\\/", "%2F"));
|
|
||||||
} catch (Exception e) {
|
|
||||||
statusHandler.error("Error getting remote object: "
|
|
||||||
+ remoteId, e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
objectToSubmit.addSlot(EbxmlObjectUtil.HOME_SLOT_NAME,
|
|
||||||
remoteRegistryUrl);
|
|
||||||
RegistrySubmitTask submitTask = new RegistrySubmitTask(
|
|
||||||
txTemplate, dao, objectToSubmit, remoteRegistryUrl);
|
|
||||||
executor.submit(submitTask);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -605,10 +607,15 @@ public class RegistryReplicationManager {
|
||||||
*/
|
*/
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
statusHandler
|
||||||
|
.info("Registry shutting down. Removing subscriptions from: ["
|
||||||
|
+ remoteRegistryBaseURL + "]");
|
||||||
RegistryRESTServices.getRegistryDataAccessService(
|
RegistryRESTServices.getRegistryDataAccessService(
|
||||||
remoteRegistryBaseURL)
|
remoteRegistryBaseURL)
|
||||||
.removeSubscriptionsForSite(
|
.removeSubscriptionsForSite(
|
||||||
registry.getOwner());
|
registry.getOwner());
|
||||||
|
statusHandler.info("Subscriptions removed from: ["
|
||||||
|
+ remoteRegistryBaseURL + "]");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
success = true;
|
success = true;
|
||||||
|
|
|
@ -23,6 +23,9 @@ import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectType;
|
||||||
|
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
|
import com.raytheon.uf.common.registry.services.RegistryRESTServices;
|
||||||
|
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||||
|
import com.raytheon.uf.common.status.UFStatus;
|
||||||
import com.raytheon.uf.edex.database.RunnableWithTransaction;
|
import com.raytheon.uf.edex.database.RunnableWithTransaction;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
|
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
||||||
|
@ -45,32 +48,54 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
||||||
*/
|
*/
|
||||||
public class RegistrySubmitTask extends RunnableWithTransaction {
|
public class RegistrySubmitTask extends RunnableWithTransaction {
|
||||||
|
|
||||||
|
private static final IUFStatusHandler statusHandler = UFStatus
|
||||||
|
.getHandler(RegistrySubmitTask.class);
|
||||||
|
|
||||||
/** The Registry Object data access object */
|
/** The Registry Object data access object */
|
||||||
private RegistryObjectDao dao;
|
private RegistryObjectDao dao;
|
||||||
|
|
||||||
/** The id of the registry object this task is submitting */
|
/** The id of the registry object this task is submitting */
|
||||||
private RegistryObjectType objectToSubmit;
|
private String objectId;
|
||||||
|
|
||||||
|
/** The URL of the remote server to get the object from */
|
||||||
|
private String remoteURL;
|
||||||
|
|
||||||
public RegistrySubmitTask(TransactionTemplate txTemplate,
|
public RegistrySubmitTask(TransactionTemplate txTemplate,
|
||||||
RegistryObjectDao dao, RegistryObjectType objectToSubmit,
|
RegistryObjectDao dao, String objectId, String remoteURL) {
|
||||||
String retrievedFrom) {
|
|
||||||
super(txTemplate);
|
super(txTemplate);
|
||||||
this.dao = dao;
|
this.dao = dao;
|
||||||
this.objectToSubmit = objectToSubmit;
|
this.objectId = objectId;
|
||||||
if (this.objectToSubmit.getSlotByName(EbxmlObjectUtil.HOME_SLOT_NAME) == null) {
|
this.remoteURL = remoteURL;
|
||||||
this.objectToSubmit.addSlot(EbxmlObjectUtil.HOME_SLOT_NAME,
|
|
||||||
retrievedFrom);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runWithTransaction() {
|
public void runWithTransaction() {
|
||||||
RegistryObjectType existingObject = dao.getById(objectToSubmit.getId());
|
try {
|
||||||
if (existingObject == null) {
|
RegistryObjectType objectToSubmit = RegistryRESTServices
|
||||||
dao.create(objectToSubmit);
|
.getRegistryObject(RegistryObjectType.class, remoteURL,
|
||||||
} else {
|
escapeObjectId(objectId));
|
||||||
dao.merge(objectToSubmit, existingObject);
|
|
||||||
|
if (objectToSubmit.getSlotByName(EbxmlObjectUtil.HOME_SLOT_NAME) == null) {
|
||||||
|
objectToSubmit.addSlot(EbxmlObjectUtil.HOME_SLOT_NAME,
|
||||||
|
remoteURL);
|
||||||
|
}
|
||||||
|
|
||||||
|
RegistryObjectType existingObject = dao.getById(objectId);
|
||||||
|
if (existingObject == null) {
|
||||||
|
dao.create(objectToSubmit);
|
||||||
|
} else {
|
||||||
|
dao.merge(objectToSubmit, existingObject);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
statusHandler.error("Error retrieving remote object: " + objectId,
|
||||||
|
e);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String escapeObjectId(String objectId) {
|
||||||
|
return objectId.replaceAll(":", "%3A").replaceAll("\\/", "%2F");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
# The period which registry subscriptions are processed
|
# The period which registry subscriptions are processed
|
||||||
ebxml-subscription-process.cron=0+0/1+*+*+*+?
|
ebxml-subscription-process.cron=0+0/1+*+*+*+?
|
||||||
ebxml-garbage-collect-process.cron=0/1+*+*+*+*+?
|
# The period which the registry runs the garbage collection
|
||||||
|
ebxml-garbage-collect-process.cron=0/10+*+*+*+*+?
|
||||||
|
# When a federation synchonization is necessary, this is the number of threads
|
||||||
|
# that will be used for synchronization
|
||||||
|
ebxml-federation-sync-threads=3
|
||||||
# Master switch enabling email transmission
|
# Master switch enabling email transmission
|
||||||
ebxml-email.enabled=false
|
ebxml-email.enabled=false
|
|
@ -23,6 +23,8 @@ import java.util.List;
|
||||||
|
|
||||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
|
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
|
||||||
|
|
||||||
|
import org.hibernate.SQLQuery;
|
||||||
|
|
||||||
import com.raytheon.uf.edex.database.dao.SessionManagedDao;
|
import com.raytheon.uf.edex.database.dao.SessionManagedDao;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -69,7 +71,7 @@ public class SlotTypeDao extends SessionManagedDao<Integer, SlotType> {
|
||||||
+ "select child_slot_key from ebxml.emailaddress_slot "
|
+ "select child_slot_key from ebxml.emailaddress_slot "
|
||||||
+ "UNION "
|
+ "UNION "
|
||||||
+ "select child_slot_key from ebxml.postaladdress_slot "
|
+ "select child_slot_key from ebxml.postaladdress_slot "
|
||||||
+ ") limit %s";
|
+ ")";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Class<SlotType> getEntityClass() {
|
protected Class<SlotType> getEntityClass() {
|
||||||
|
@ -85,12 +87,10 @@ public class SlotTypeDao extends SessionManagedDao<Integer, SlotType> {
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public List<Integer> getOrphanedSlotIds(int limit) {
|
public List<Integer> getOrphanedSlotIds(int limit) {
|
||||||
return this
|
SQLQuery query = this.getSessionFactory().getCurrentSession()
|
||||||
.getSessionFactory()
|
.createSQLQuery(ORPHANED_SLOT_QUERY);
|
||||||
.getCurrentSession()
|
query.setMaxResults(limit);
|
||||||
.createSQLQuery(
|
return query.list();
|
||||||
String.format(ORPHANED_SLOT_QUERY,
|
|
||||||
String.valueOf(limit))).list();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteBySlotId(Integer id) {
|
public void deleteBySlotId(Integer id) {
|
||||||
|
|
|
@ -139,7 +139,7 @@ public class RegistryGarbageCollector {
|
||||||
* Determines how many more orphaned slots can be added to the queue
|
* Determines how many more orphaned slots can be added to the queue
|
||||||
* based on the existing queue size
|
* based on the existing queue size
|
||||||
*/
|
*/
|
||||||
int limit = QUEUE_MAX_SIZE - orphanedSlotExecutor.getQueue().size();
|
int limit = orphanedSlotExecutor.getQueue().remainingCapacity();
|
||||||
if (limit > QUEUE_MAX_SIZE * .25) {
|
if (limit > QUEUE_MAX_SIZE * .25) {
|
||||||
List<Integer> orphanedSlotIds = slotDao.getOrphanedSlotIds(limit);
|
List<Integer> orphanedSlotIds = slotDao.getOrphanedSlotIds(limit);
|
||||||
for (Integer slotId : orphanedSlotIds) {
|
for (Integer slotId : orphanedSlotIds) {
|
||||||
|
@ -160,7 +160,7 @@ public class RegistryGarbageCollector {
|
||||||
* If errors occur while enqueuing events to be deleted
|
* If errors occur while enqueuing events to be deleted
|
||||||
*/
|
*/
|
||||||
private void purgeExpiredEvents() throws EbxmlRegistryException {
|
private void purgeExpiredEvents() throws EbxmlRegistryException {
|
||||||
int limit = QUEUE_MAX_SIZE - expiredEventExecutor.getQueue().size();
|
int limit = orphanedSlotExecutor.getQueue().remainingCapacity();
|
||||||
if (limit > QUEUE_MAX_SIZE * .25) {
|
if (limit > QUEUE_MAX_SIZE * .25) {
|
||||||
List<AuditableEventType> expiredEvents = eventDao
|
List<AuditableEventType> expiredEvents = eventDao
|
||||||
.getExpiredEvents(limit);
|
.getExpiredEvents(limit);
|
||||||
|
|
Loading…
Add table
Reference in a new issue