Merge "Issue #2191 Addressing comments from previous change" into development

Former-commit-id: 40c4593896 [formerly 95357f0b2c [formerly 8a9557269526e7f8efd21a8c46d50f30fb9f7b6e]]
Former-commit-id: 95357f0b2c
Former-commit-id: 2f6af1cf34
This commit is contained in:
Richard Peter 2013-08-08 09:58:18 -05:00 committed by Gerrit Code Review
commit cf63f60a44
12 changed files with 167 additions and 119 deletions

View file

@ -8,7 +8,7 @@
<constructor-arg ref="ebxmlFederationEnabled"/>
<constructor-arg ref="lcmServiceImpl" />
<constructor-arg value="ebxml/federation/ncfFederationConfig.xml" />
<property name="replicationManager" ref="RegistryReplicationManager"/>
<constructor-arg ref="RegistryReplicationManager"/>
<property name="registryObjectDao" ref="registryObjectDao"/>
<property name="registryDao" ref="registryDao"/>
</bean>

View file

@ -8,8 +8,8 @@
<constructor-arg ref="ebxmlFederationEnabled"/>
<constructor-arg ref="lcmServiceImpl" />
<constructor-arg value="ebxml/federation/federationConfig.xml"/>
<constructor-arg ref="RegistryReplicationManager"/>
<constructor-arg value="${NCF_ADDRESS}" />
<property name="replicationManager" ref="RegistryReplicationManager"/>
<property name="registryObjectDao" ref="registryObjectDao"/>
<property name="registryDao" ref="registryDao"/>
<property name="txTemplate" ref="metadataTxTemplate"/>

View file

@ -14,6 +14,7 @@
<constructor-arg ref="registryObjectDao"/>
<constructor-arg ref="FederatedRegistryMonitor"/>
<constructor-arg ref="metadataTxTemplate"/>
<constructor-arg value="${ebxml-federation-sync-threads}"/>
</bean>
<bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">

View file

@ -115,14 +115,15 @@ public class FederatedRegistryMonitor extends RunnableWithTransaction {
regObj.setVersionInfo(new VersionInfoType());
regObj.setStatus(StatusTypes.APPROVED);
regObj.getSlot().add(slot);
registryObjectDao.create(regObj);
} else {
DateTimeValueType dateTime = (DateTimeValueType) regObj
.getSlotByName(REGISTRY_AVAILABLE_ID).getSlotValue();
dateTime.setDateTimeValue(EbxmlObjectUtil
.getCurrentTimeAsXMLGregorianCalendar());
registryObjectDao.update(regObj);
}
registryObjectDao.createOrUpdate(regObj);
} catch (EbxmlRegistryException e) {
statusHandler.error("Error updating federated time!", e);
}

View file

@ -26,7 +26,6 @@ import java.util.List;
import javax.xml.bind.JAXBException;
import oasis.names.tc.ebxml.regrep.wsdl.registry.services.v4.LifecycleManager;
import oasis.names.tc.ebxml.regrep.wsdl.registry.services.v4.MsgRegistryException;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.AssociationType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.FederationType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.OrganizationType;
@ -38,6 +37,7 @@ import com.raytheon.uf.common.registry.constants.RegistryObjectTypes;
import com.raytheon.uf.common.registry.constants.StatusTypes;
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistryReplicationManager;
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
@ -84,9 +84,11 @@ public class NcfRegistryFederationManager extends RegistryFederationManager
* If errors occur while deserializing the federation properties
*/
protected NcfRegistryFederationManager(boolean federationEnabled,
LifecycleManager lcm, String federationPropertiesFileName)
LifecycleManager lcm, String federationPropertiesFileName,
RegistryReplicationManager replicationManager)
throws JAXBException, IOException, SerializationException {
super(federationEnabled, lcm, federationPropertiesFileName);
super(federationEnabled, lcm, federationPropertiesFileName,
replicationManager);
}
@Override
@ -110,8 +112,8 @@ public class NcfRegistryFederationManager extends RegistryFederationManager
replicationManager.submitRemoteSubscriptions(registry);
try {
replicationManager.checkDownTime();
} catch (MsgRegistryException e) {
throw new EbxmlRegistryException("Error checkint down time!", e);
} catch (Exception e) {
throw new EbxmlRegistryException("Error checking down time!", e);
}
} else {
statusHandler.info("Federation is disabled for this registry.");

View file

@ -47,6 +47,7 @@ import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistryReplicationManager;
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao;
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
@ -141,10 +142,12 @@ public abstract class RegistryFederationManager {
* If errors occur when unmarshalling the federation properties
*/
protected RegistryFederationManager(boolean federationEnabled,
LifecycleManager lcm, String federationPropertiesFileName)
LifecycleManager lcm, String federationPropertiesFileName,
RegistryReplicationManager replicationManager)
throws JAXBException, SerializationException {
this.federationEnabled = federationEnabled;
this.lcm = lcm;
this.replicationManager = replicationManager;
jaxbManager = new JAXBManager(SubmitObjectsRequest.class,
FederationProperties.class);
File federationPropertiesFile = PathManagerFactory.getPathManager()
@ -160,6 +163,14 @@ public abstract class RegistryFederationManager {
federationProperties = (FederationProperties) jaxbManager
.jaxbUnmarshalFromXmlFile(federationPropertiesFile);
}
if (this.replicationManager.getServers() == null
|| CollectionUtil.isNullOrEmpty(replicationManager
.getServers().getRegistryReplicationServers())) {
statusHandler
.warn("No servers configured for replication. Federation functionality is disabled");
this.federationEnabled = false;
this.replicationManager.setSubscriptionProcessingEnabled(false);
}
}
}
@ -211,11 +222,6 @@ public abstract class RegistryFederationManager {
this.registryObjectDao = registryObjectDao;
}
public void setReplicationManager(
RegistryReplicationManager replicationManager) {
this.replicationManager = replicationManager;
}
public void setRegistryDao(RegistryDao registryDao) {
this.registryDao = registryDao;
}

View file

@ -54,6 +54,7 @@ import com.raytheon.uf.common.registry.services.RegistryRESTServices;
import com.raytheon.uf.common.registry.services.RegistrySOAPServices;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.edex.database.RunnableWithTransaction;
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistryReplicationManager;
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
@ -111,9 +112,10 @@ public class WfoRegistryFederationManager extends RegistryFederationManager
*/
protected WfoRegistryFederationManager(boolean federationEnabled,
LifecycleManager lcm, String federationPropertiesFileName,
String ncfAddress) throws JAXBException, IOException,
SerializationException {
super(federationEnabled, lcm, federationPropertiesFileName);
RegistryReplicationManager replicationManager, String ncfAddress)
throws JAXBException, IOException, SerializationException {
super(federationEnabled, lcm, federationPropertiesFileName,
replicationManager);
this.ncfAddress = ncfAddress;
scheduler = Executors.newSingleThreadScheduledExecutor();
}

View file

@ -138,6 +138,15 @@ public class RegistryReplicationManager {
/** Object types to automatically create subscriptions for */
private static List<String> objectTypes = new ArrayList<String>();
/**
* When a federation sync is necessary, this is the number of threads that
* will be used for synchronization. Configurable in the
* com.raytheon.uf.edex.registry.ebxml.properties file. Default is 25
*/
private int registrySyncThreads = 25;
private int maxSyncRetries = 3;
/**
* Creates a new RegistryReplicationManager
*
@ -152,13 +161,14 @@ public class RegistryReplicationManager {
public RegistryReplicationManager(boolean subscriptionProcessingEnabled,
String notificationServerConfigFileName, RegistryObjectDao dao,
FederatedRegistryMonitor availabilityMonitor,
TransactionTemplate txTemplate) throws JAXBException,
SerializationException {
TransactionTemplate txTemplate, int registrySyncThreads)
throws JAXBException, SerializationException {
this.subscriptionProcessingEnabled = subscriptionProcessingEnabled;
this.replicationConfigFileName = notificationServerConfigFileName;
this.dao = dao;
this.txTemplate = txTemplate;
this.federatedRegistryMonitor = availabilityMonitor;
this.registrySyncThreads = registrySyncThreads;
jaxbManager = new JAXBManager(NotificationServers.class,
SubscriptionType.class);
File notificationServerConfigFile = PathManagerFactory.getPathManager()
@ -180,13 +190,9 @@ public class RegistryReplicationManager {
* for over 2 days, the registry is synchronized with one of the federation
* members
*
* @throws MsgRegistryException
* If errors occur during registry synchronization
* @throws EbxmlRegistryException
* If errors occur during registry synchronization
* @throws Exception
*/
public void checkDownTime() throws MsgRegistryException,
EbxmlRegistryException {
public void checkDownTime() throws Exception {
long currentTime = TimeUtil.currentTimeMillis();
long lastKnownUp = federatedRegistryMonitor.getLastKnownUptime();
long downTime = currentTime - lastKnownUp;
@ -199,51 +205,77 @@ public class RegistryReplicationManager {
// synchronization of the
// data from the federation
if (currentTime - lastKnownUp > MAX_DOWN_TIME_DURATION) {
statusHandler
.warn("Registry has been down for ~2 days. Initiating federated data synchronization....");
List<NotificationHostConfiguration> notificationServers = servers
.getRegistryReplicationServers();
if (servers == null
|| CollectionUtil.isNullOrEmpty(servers
.getRegistryReplicationServers())) {
statusHandler
.warn("No servers configured for replication. Unable to synchronize data with federation!");
} else {
NotificationHostConfiguration registryToSyncFrom = null;
for (NotificationHostConfiguration config : notificationServers) {
statusHandler.info("Checking availability of registry at: "
+ config.getRegistryBaseURL());
if (RegistryRESTServices.isRegistryAvailable(config
.getRegistryBaseURL())) {
registryToSyncFrom = config;
break;
}
statusHandler.info("Registry at "
+ config.getRegistryBaseURL()
+ " is not available...");
}
// No available registry was found!
if (registryToSyncFrom == null) {
int syncAttempt = 1;
for (; syncAttempt <= maxSyncRetries; syncAttempt++) {
try {
statusHandler
.warn("No available registries found! Registry data will not be synchronized with the federation!");
} else {
synchronizeRegistryWithFederation(registryToSyncFrom
.getRegistryBaseURL());
.warn("Registry has been down for ~2 days. Initiating federated registry data synchronization attempt #"
+ syncAttempt
+ "/"
+ maxSyncRetries
+ "...");
List<NotificationHostConfiguration> notificationServers = servers
.getRegistryReplicationServers();
if (servers == null
|| CollectionUtil.isNullOrEmpty(servers
.getRegistryReplicationServers())) {
statusHandler
.error("No servers configured for replication. Unable to synchronize registry data with federation!");
} else {
NotificationHostConfiguration registryToSyncFrom = null;
for (NotificationHostConfiguration config : notificationServers) {
statusHandler
.info("Checking availability of registry at: "
+ config.getRegistryBaseURL());
if (RegistryRESTServices.isRegistryAvailable(config
.getRegistryBaseURL())) {
registryToSyncFrom = config;
break;
}
statusHandler.info("Registry at "
+ config.getRegistryBaseURL()
+ " is not available...");
}
// No available registry was found!
if (registryToSyncFrom == null) {
throw new EbxmlRegistryException(
"No available registries found! Registry data will not be synchronized with the federation!");
} else {
synchronizeRegistryWithFederation(registryToSyncFrom
.getRegistryBaseURL());
statusHandler
.info("Starting federated uptime monitor...");
scheduler.scheduleAtFixedRate(
federatedRegistryMonitor, 0, 1,
TimeUnit.MINUTES);
// Sync was successful, break out of retry loop
break;
}
}
} catch (Exception e) {
if (syncAttempt < maxSyncRetries) {
statusHandler.error(
"Federation registry data synchronization attempt #"
+ syncAttempt + "/" + maxSyncRetries
+ " failed! Retrying...", e);
} else {
statusHandler
.fatal("Federation registry data synchronization has failed",
e);
throw e;
}
}
}
}
statusHandler.info("Starting federated uptime monitor...");
scheduler.scheduleAtFixedRate(federatedRegistryMonitor, 0, 1,
TimeUnit.MINUTES);
}
private void synchronizeRegistryWithFederation(String remoteRegistryUrl)
throws MsgRegistryException, EbxmlRegistryException {
ExecutorService executor = Executors.newFixedThreadPool(25);
ExecutorService executor = Executors
.newFixedThreadPool(this.registrySyncThreads);
for (String objectType : objectTypes) {
Set<String> localIds = new HashSet<String>();
Set<String> remoteIds = new HashSet<String>();
@ -279,23 +311,8 @@ public class RegistryReplicationManager {
*/
for (String localId : localIds) {
if (remoteIds.contains(localId)) {
RegistryObjectType objectToSubmit;
try {
objectToSubmit = RegistryRESTServices
.getRegistryObject(RegistryObjectType.class,
remoteRegistryUrl,
localId.replaceAll(":", "%3A")
.replaceAll("\\/", "%2F"));
} catch (Exception e) {
statusHandler.error("Error getting remote object: "
+ localId, e);
continue;
}
objectToSubmit.addSlot(EbxmlObjectUtil.HOME_SLOT_NAME,
remoteRegistryUrl);
RegistrySubmitTask submitTask = new RegistrySubmitTask(
txTemplate, dao, objectToSubmit, remoteRegistryUrl);
executor.submit(submitTask);
executor.submit(new RegistrySubmitTask(txTemplate, dao,
localId, remoteRegistryUrl));
} else {
RegistryRemoveTask removeTask = new RegistryRemoveTask(
txTemplate, dao, localId);
@ -310,23 +327,8 @@ public class RegistryReplicationManager {
*/
for (String remoteId : remoteIds) {
if (!localIds.contains(remoteId)) {
RegistryObjectType objectToSubmit;
try {
objectToSubmit = RegistryRESTServices
.getRegistryObject(RegistryObjectType.class,
remoteRegistryUrl,
remoteId.replaceAll(":", "%3A")
.replaceAll("\\/", "%2F"));
} catch (Exception e) {
statusHandler.error("Error getting remote object: "
+ remoteId, e);
continue;
}
objectToSubmit.addSlot(EbxmlObjectUtil.HOME_SLOT_NAME,
remoteRegistryUrl);
RegistrySubmitTask submitTask = new RegistrySubmitTask(
txTemplate, dao, objectToSubmit, remoteRegistryUrl);
executor.submit(submitTask);
executor.submit(new RegistrySubmitTask(txTemplate, dao,
remoteId, remoteRegistryUrl));
}
}
}
@ -605,10 +607,15 @@ public class RegistryReplicationManager {
*/
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
statusHandler
.info("Registry shutting down. Removing subscriptions from: ["
+ remoteRegistryBaseURL + "]");
RegistryRESTServices.getRegistryDataAccessService(
remoteRegistryBaseURL)
.removeSubscriptionsForSite(
registry.getOwner());
statusHandler.info("Subscriptions removed from: ["
+ remoteRegistryBaseURL + "]");
}
});
success = true;

View file

@ -23,6 +23,9 @@ import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectType;
import org.springframework.transaction.support.TransactionTemplate;
import com.raytheon.uf.common.registry.services.RegistryRESTServices;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.edex.database.RunnableWithTransaction;
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
@ -45,32 +48,54 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
*/
public class RegistrySubmitTask extends RunnableWithTransaction {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(RegistrySubmitTask.class);
/** The Registry Object data access object */
private RegistryObjectDao dao;
/** The id of the registry object this task is submitting */
private RegistryObjectType objectToSubmit;
private String objectId;
/** The URL of the remote server to get the object from */
private String remoteURL;
public RegistrySubmitTask(TransactionTemplate txTemplate,
RegistryObjectDao dao, RegistryObjectType objectToSubmit,
String retrievedFrom) {
RegistryObjectDao dao, String objectId, String remoteURL) {
super(txTemplate);
this.dao = dao;
this.objectToSubmit = objectToSubmit;
if (this.objectToSubmit.getSlotByName(EbxmlObjectUtil.HOME_SLOT_NAME) == null) {
this.objectToSubmit.addSlot(EbxmlObjectUtil.HOME_SLOT_NAME,
retrievedFrom);
}
this.objectId = objectId;
this.remoteURL = remoteURL;
}
@Override
public void runWithTransaction() {
RegistryObjectType existingObject = dao.getById(objectToSubmit.getId());
if (existingObject == null) {
dao.create(objectToSubmit);
} else {
dao.merge(objectToSubmit, existingObject);
try {
RegistryObjectType objectToSubmit = RegistryRESTServices
.getRegistryObject(RegistryObjectType.class, remoteURL,
escapeObjectId(objectId));
if (objectToSubmit.getSlotByName(EbxmlObjectUtil.HOME_SLOT_NAME) == null) {
objectToSubmit.addSlot(EbxmlObjectUtil.HOME_SLOT_NAME,
remoteURL);
}
RegistryObjectType existingObject = dao.getById(objectId);
if (existingObject == null) {
dao.create(objectToSubmit);
} else {
dao.merge(objectToSubmit, existingObject);
}
} catch (Exception e) {
statusHandler.error("Error retrieving remote object: " + objectId,
e);
return;
}
}
private String escapeObjectId(String objectId) {
return objectId.replaceAll(":", "%3A").replaceAll("\\/", "%2F");
}
}

View file

@ -1,5 +1,9 @@
# The period which registry subscriptions are processed
ebxml-subscription-process.cron=0+0/1+*+*+*+?
ebxml-garbage-collect-process.cron=0/1+*+*+*+*+?
# The period which the registry runs the garbage collection
ebxml-garbage-collect-process.cron=0/10+*+*+*+*+?
# When a federation synchonization is necessary, this is the number of threads
# that will be used for synchronization
ebxml-federation-sync-threads=3
# Master switch enabling email transmission
ebxml-email.enabled=false

View file

@ -23,6 +23,8 @@ import java.util.List;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
import org.hibernate.SQLQuery;
import com.raytheon.uf.edex.database.dao.SessionManagedDao;
/**
@ -69,7 +71,7 @@ public class SlotTypeDao extends SessionManagedDao<Integer, SlotType> {
+ "select child_slot_key from ebxml.emailaddress_slot "
+ "UNION "
+ "select child_slot_key from ebxml.postaladdress_slot "
+ ") limit %s";
+ ")";
@Override
protected Class<SlotType> getEntityClass() {
@ -85,12 +87,10 @@ public class SlotTypeDao extends SessionManagedDao<Integer, SlotType> {
*/
@SuppressWarnings("unchecked")
public List<Integer> getOrphanedSlotIds(int limit) {
return this
.getSessionFactory()
.getCurrentSession()
.createSQLQuery(
String.format(ORPHANED_SLOT_QUERY,
String.valueOf(limit))).list();
SQLQuery query = this.getSessionFactory().getCurrentSession()
.createSQLQuery(ORPHANED_SLOT_QUERY);
query.setMaxResults(limit);
return query.list();
}
public void deleteBySlotId(Integer id) {

View file

@ -139,7 +139,7 @@ public class RegistryGarbageCollector {
* Determines how many more orphaned slots can be added to the queue
* based on the existing queue size
*/
int limit = QUEUE_MAX_SIZE - orphanedSlotExecutor.getQueue().size();
int limit = orphanedSlotExecutor.getQueue().remainingCapacity();
if (limit > QUEUE_MAX_SIZE * .25) {
List<Integer> orphanedSlotIds = slotDao.getOrphanedSlotIds(limit);
for (Integer slotId : orphanedSlotIds) {
@ -160,7 +160,7 @@ public class RegistryGarbageCollector {
* If errors occur while enqueuing events to be deleted
*/
private void purgeExpiredEvents() throws EbxmlRegistryException {
int limit = QUEUE_MAX_SIZE - expiredEventExecutor.getQueue().size();
int limit = orphanedSlotExecutor.getQueue().remainingCapacity();
if (limit > QUEUE_MAX_SIZE * .25) {
List<AuditableEventType> expiredEvents = eventDao
.getExpiredEvents(limit);