Merge "Issue #2613 Registry performance enhancements" into omaha_14.2.1
Former-commit-id:cfd32aee5f
[formerly4ec080e015
] [formerly375c532376
] [formerly0ed2ba8024
[formerly375c532376
[formerly 6a3b2d31829b4f6d39a6a06ec06c901b58506deb]]] Former-commit-id:0ed2ba8024
Former-commit-id: 19bf66da69dde6bc681fece6d897e713df539fe5 [formerly890f038a54
] Former-commit-id:1d339f16f5
This commit is contained in:
commit
34f55d6132
13 changed files with 194 additions and 128 deletions
|
@ -30,7 +30,7 @@ import javax.xml.bind.annotation.XmlSeeAlso;
|
|||
import oasis.names.tc.ebxml.regrep.xsd.spi.v4.CatalogObjectsRequest;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.spi.v4.CatalogObjectsResponse;
|
||||
|
||||
import org.apache.cxf.annotations.GZIP;
|
||||
import org.apache.cxf.annotations.FastInfoset;
|
||||
|
||||
import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
||||
|
||||
|
@ -46,12 +46,13 @@ import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* 2012 bphillip Initial implementation
|
||||
* 10/17/2013 1682 bphillip Added software history
|
||||
* 12/9/2013 2613 bphillip Changed to use FastInfoset
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1
|
||||
*/
|
||||
@GZIP(threshold = 0)
|
||||
@FastInfoset
|
||||
@WebService(name = "Cataloger", targetNamespace = EbxmlNamespaces.SPI_INT_URI)
|
||||
@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
|
||||
@XmlSeeAlso({ oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectFactory.class,
|
||||
|
|
|
@ -32,7 +32,7 @@ import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.SubmitObjectsRequest;
|
|||
import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.UpdateObjectsRequest;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.rs.v4.RegistryResponseType;
|
||||
|
||||
import org.apache.cxf.annotations.GZIP;
|
||||
import org.apache.cxf.annotations.FastInfoset;
|
||||
|
||||
import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
||||
|
||||
|
@ -48,12 +48,13 @@ import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* 2012 bphillip Initial implementation
|
||||
* 10/17/2013 1682 bphillip Added software history
|
||||
* 12/9/2013 2613 bphillip Changed to use FastInfoset
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1
|
||||
*/
|
||||
@GZIP(threshold = 0)
|
||||
@FastInfoset
|
||||
@WebService(name = "LifecycleManager", targetNamespace = EbxmlNamespaces.RR_INT_URI)
|
||||
@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
|
||||
@XmlSeeAlso({ oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectFactory.class,
|
||||
|
|
|
@ -32,7 +32,7 @@ import javax.xml.bind.annotation.XmlSeeAlso;
|
|||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.NotificationType;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.rs.v4.RegistryResponseType;
|
||||
|
||||
import org.apache.cxf.annotations.GZIP;
|
||||
import org.apache.cxf.annotations.FastInfoset;
|
||||
|
||||
import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
||||
|
||||
|
@ -46,12 +46,13 @@ import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
|||
* 2012 bphillip Initial implementation
|
||||
* 10/17/2013 1682 bphillip Added software history
|
||||
* 10/20/2013 1682 bphillip Added synchronous notification delivery
|
||||
* 12/9/2013 2613 bphillip Changed to use FastInfoset
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1
|
||||
*/
|
||||
@GZIP(threshold = 0)
|
||||
@FastInfoset
|
||||
@WebService(name = "NotificationListener", targetNamespace = EbxmlNamespaces.NL_INT_URI)
|
||||
@SOAPBinding(style = Style.DOCUMENT, parameterStyle = SOAPBinding.ParameterStyle.BARE)
|
||||
@XmlSeeAlso({ oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectFactory.class,
|
||||
|
|
|
@ -30,7 +30,7 @@ import javax.xml.bind.annotation.XmlSeeAlso;
|
|||
import oasis.names.tc.ebxml.regrep.xsd.query.v4.QueryRequest;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.query.v4.QueryResponse;
|
||||
|
||||
import org.apache.cxf.annotations.GZIP;
|
||||
import org.apache.cxf.annotations.FastInfoset;
|
||||
|
||||
import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
||||
|
||||
|
@ -43,12 +43,13 @@ import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* 2012 bphillip Initial implementation
|
||||
* 10/17/2013 1682 bphillip Added software history
|
||||
* 12/9/2013 2613 bphillip Changed to use FastInfoset
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1
|
||||
*/
|
||||
@GZIP(threshold = 0)
|
||||
@FastInfoset
|
||||
@WebService(name = "QueryManager", targetNamespace = EbxmlNamespaces.RR_INT_URI)
|
||||
@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
|
||||
@XmlSeeAlso({ oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectFactory.class,
|
||||
|
|
|
@ -30,7 +30,7 @@ import javax.xml.bind.annotation.XmlSeeAlso;
|
|||
import oasis.names.tc.ebxml.regrep.xsd.spi.v4.ValidateObjectsRequest;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.spi.v4.ValidateObjectsResponse;
|
||||
|
||||
import org.apache.cxf.annotations.GZIP;
|
||||
import org.apache.cxf.annotations.FastInfoset;
|
||||
|
||||
import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
||||
|
||||
|
@ -43,12 +43,13 @@ import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* 2012 bphillip Initial implementation
|
||||
* 10/17/2013 1682 bphillip Added software history
|
||||
* 12/9/2013 2613 bphillip Changed to use FastInfoset
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1
|
||||
*/
|
||||
@GZIP(threshold = 0)
|
||||
@FastInfoset
|
||||
@WebService(name = "Validator", targetNamespace = EbxmlNamespaces.SPI_INT_URI)
|
||||
@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
|
||||
@XmlSeeAlso({ oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectFactory.class,
|
||||
|
|
|
@ -62,6 +62,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
|
|||
* May 01, 2013 1967 njensen Fixed autoboxing for Eclipse 3.8
|
||||
* Jun 24, 2013 2106 djohnson Use IDENTIFIER generic for method signature.
|
||||
* 10/8/2013 1682 bphillip Added the createCriteria method
|
||||
* 12/9/2013 2613 bphillip Added flushAndClearSession method
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -429,6 +430,14 @@ public abstract class SessionManagedDao<IDENTIFIER extends Serializable, ENTITY
|
|||
return ((SessionFactoryImpl) template.getSessionFactory()).getDialect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes and clears the current Hibernate Session
|
||||
*/
|
||||
public void flushAndClearSession() {
|
||||
this.getSessionFactory().getCurrentSession().flush();
|
||||
this.getSessionFactory().getCurrentSession().clear();
|
||||
}
|
||||
|
||||
protected SessionFactory getSessionFactory() {
|
||||
return template.getSessionFactory();
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package com.raytheon.uf.edex.datadelivery.registry.federation;
|
||||
|
||||
import java.io.File;
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
|
@ -46,6 +47,7 @@ import oasis.names.tc.ebxml.regrep.wsdl.registry.services.v4.QueryManager;
|
|||
import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.Mode;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.SubmitObjectsRequest;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.query.v4.QueryRequest;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.query.v4.QueryResponse;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.query.v4.ResponseOptionType;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.AssociationType;
|
||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.DeliveryInfoType;
|
||||
|
@ -97,12 +99,11 @@ import com.raytheon.uf.common.status.UFStatus;
|
|||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
import com.raytheon.uf.common.util.CollectionUtil;
|
||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||
import com.raytheon.uf.edex.database.RunnableWithTransaction;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.availability.FederatedRegistryMonitor;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.replication.NotificationHostConfiguration;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.replication.NotificationServers;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistryRemoveTask;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.replication.RegistrySubmitTask;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.web.DataDeliveryRESTServices;
|
||||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao;
|
||||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
|
||||
|
@ -149,6 +150,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
|||
* 10/30/2013 1538 bphillip Changed submitObjects method to submit objects to NCF by default
|
||||
* 11/20/2013 2534 bphillip Consolidated RegistryReplicationManager into this class and added reciprocal subscriptions. Added remote subscription monitor.
|
||||
* 12/2/2013 1829 bphillip Modified to use correct getters for slot values
|
||||
* 12/9/2013 2613 bphillip Optimized registry sync function
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -191,6 +193,12 @@ public class RegistryFederationManager implements RegistryInitializedListener {
|
|||
private static final String SUBSCRIPTION_DETAIL_FORMAT = "Replication Subscription for [%s] objects for server ["
|
||||
+ RegistryUtil.LOCAL_REGISTRY_ADDRESS + "]";
|
||||
|
||||
/** Query used for synchronizing registries */
|
||||
private static final String SYNC_QUERY = "FROM RegistryObjectType obj where obj.objectType='%s' order by obj.id asc";
|
||||
|
||||
/** Batch size for registry synchronization queries */
|
||||
private static final int SYNC_BATCH_SIZE = 250;
|
||||
|
||||
/** The address of the NCF */
|
||||
protected String ncfAddress = System.getenv("NCF_ADDRESS");
|
||||
|
||||
|
@ -259,6 +267,8 @@ public class RegistryFederationManager implements RegistryInitializedListener {
|
|||
*/
|
||||
private static AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
|
||||
private Long syncTime = 0l;
|
||||
|
||||
/**
|
||||
* Creates a new RegistryFederationManager
|
||||
*/
|
||||
|
@ -675,7 +685,8 @@ public class RegistryFederationManager implements RegistryInitializedListener {
|
|||
sub.setOwner(federationProperties.getSiteIdentifier());
|
||||
sub.setStatus(StatusTypes.APPROVED);
|
||||
|
||||
sub.setStartTime(EbxmlObjectUtil.getTimeAsXMLGregorianCalendar(0));
|
||||
sub.setStartTime(EbxmlObjectUtil.getTimeAsXMLGregorianCalendar(syncTime
|
||||
.longValue()));
|
||||
QueryType selectorQuery = new QueryType();
|
||||
selectorQuery.setQueryDefinition(CanonicalQueryTypes.ADHOC_QUERY);
|
||||
|
||||
|
@ -826,75 +837,30 @@ public class RegistryFederationManager implements RegistryInitializedListener {
|
|||
* The URL of the registry to sync with
|
||||
* @throws EbxmlRegistryException
|
||||
* If the thread executor fails to shut down properly
|
||||
* @throws MsgRegistryException
|
||||
*/
|
||||
public void synchronizeRegistryWithFederation(String remoteRegistryUrl)
|
||||
throws EbxmlRegistryException {
|
||||
public void synchronizeRegistryWithFederation(final String remoteRegistryUrl)
|
||||
throws EbxmlRegistryException, MsgRegistryException {
|
||||
long start = TimeUtil.currentTimeMillis();
|
||||
syncTime = start;
|
||||
ExecutorService executor = Executors
|
||||
.newFixedThreadPool(this.registrySyncThreads);
|
||||
for (String objectType : objectTypes) {
|
||||
Set<String> localIds = null;
|
||||
Set<String> remoteIds = null;
|
||||
statusHandler
|
||||
.info("Getting registry object Ids from local registry...");
|
||||
Collection<String> response = dataDeliveryRestClient
|
||||
.getRegistryDataAccessService(
|
||||
RegistryUtil.LOCAL_REGISTRY_ADDRESS)
|
||||
.getRegistryObjectIdsOfType(objectType).getPayload();
|
||||
if (response != null) {
|
||||
localIds = new HashSet<String>(response.size(), 1);
|
||||
localIds.addAll(response);
|
||||
} else {
|
||||
localIds = new HashSet<String>();
|
||||
}
|
||||
|
||||
statusHandler.info(localIds.size() + " objects of type "
|
||||
+ objectType + " present in local registry.");
|
||||
statusHandler.info("Getting registry object Ids from "
|
||||
+ remoteRegistryUrl + "...");
|
||||
response = dataDeliveryRestClient
|
||||
.getRegistryDataAccessService(remoteRegistryUrl)
|
||||
.getRegistryObjectIdsOfType(objectType).getPayload();
|
||||
if (response != null) {
|
||||
remoteIds = new HashSet<String>(response.size(), 1);
|
||||
remoteIds.addAll(response);
|
||||
} else {
|
||||
remoteIds = new HashSet<String>();
|
||||
}
|
||||
statusHandler.info(remoteIds.size() + " objects of type "
|
||||
+ objectType + " present on registry at "
|
||||
+ remoteRegistryUrl);
|
||||
statusHandler.info("Synchronizing objects of type " + objectType
|
||||
+ "...");
|
||||
for (final String objectType : objectTypes) {
|
||||
executor.submit(new RunnableWithTransaction(txTemplate) {
|
||||
|
||||
/*
|
||||
* Iterate through local objects and compare them with the remote
|
||||
* object inventory to determine if they need to be updated or
|
||||
* deleted locally
|
||||
*/
|
||||
for (String localId : localIds) {
|
||||
if (remoteIds.contains(localId)) {
|
||||
executor.submit(new RegistrySubmitTask(txTemplate,
|
||||
registryObjectDao, localId, remoteRegistryUrl,
|
||||
dataDeliveryRestClient));
|
||||
} else {
|
||||
RegistryRemoveTask removeTask = new RegistryRemoveTask(
|
||||
txTemplate, registryObjectDao, localId);
|
||||
executor.submit(removeTask);
|
||||
@Override
|
||||
public void runWithTransaction() {
|
||||
try {
|
||||
syncObjectType(objectType, remoteRegistryUrl);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
"Error synching object type [" + objectType
|
||||
+ "] with registry at ["
|
||||
+ remoteRegistryUrl + "]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Iterate through the remote objects to see if there are any
|
||||
* objects on the remote registry that do not exist local. If found,
|
||||
* retrieve them and add them to the local registry
|
||||
*/
|
||||
for (String remoteId : remoteIds) {
|
||||
if (!localIds.contains(remoteId)) {
|
||||
executor.submit(new RegistrySubmitTask(txTemplate,
|
||||
registryObjectDao, remoteId, remoteRegistryUrl,
|
||||
dataDeliveryRestClient));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for all threads to complete
|
||||
|
@ -909,9 +875,113 @@ public class RegistryFederationManager implements RegistryInitializedListener {
|
|||
throw new EbxmlRegistryException(
|
||||
"Task executor did not shutdown properly!", e);
|
||||
}
|
||||
|
||||
statusHandler.info("Registry synchronization using ["
|
||||
+ remoteRegistryUrl + "] completed successfully!");
|
||||
+ remoteRegistryUrl + "] completed successfully in "
|
||||
+ (TimeUtil.currentTimeMillis() - start) + " ms");
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronizes objects of the specified type with the specified remote
|
||||
* registry
|
||||
*
|
||||
* @param objectType
|
||||
* The object type to synchronize
|
||||
* @param remoteRegistryUrl
|
||||
* The url of the remote registry
|
||||
* @throws EbxmlRegistryException
|
||||
* If there are errors deleting existing objects
|
||||
* @throws MsgRegistryException
|
||||
* If there are errors executing the remote soap query
|
||||
*/
|
||||
public void syncObjectType(String objectType, String remoteRegistryUrl)
|
||||
throws EbxmlRegistryException, MsgRegistryException {
|
||||
|
||||
try {
|
||||
statusHandler.info("Deleting objects of type: " + objectType);
|
||||
int deleteCount = registryObjectDao
|
||||
.executeHQLStatement(
|
||||
"DELETE FROM RegistryObjectType obj where obj.objectType=:objectType",
|
||||
"objectType", objectType);
|
||||
statusHandler.info(deleteCount + " objects of type [" + objectType
|
||||
+ "] deleted from registry");
|
||||
} catch (DataAccessLayerException e) {
|
||||
throw new EbxmlRegistryException("Error deleting objects of type "
|
||||
+ objectType, e);
|
||||
}
|
||||
|
||||
statusHandler.info("Querying " + remoteRegistryUrl
|
||||
+ " for objects of type: " + objectType);
|
||||
|
||||
// Get the list of remote object ids so we can check later to ensure all
|
||||
// objects were retrieved
|
||||
Collection<String> remoteIds = dataDeliveryRestClient
|
||||
.getRegistryDataAccessService(remoteRegistryUrl)
|
||||
.getRegistryObjectIdsOfType(objectType).getPayload();
|
||||
|
||||
SlotType queryLanguageSlot = new SlotType(
|
||||
QueryConstants.QUERY_LANGUAGE, new StringValueType(
|
||||
QueryLanguages.HQL));
|
||||
SlotType queryExpressionSlot = new SlotType(
|
||||
QueryConstants.QUERY_EXPRESSION, new StringValueType(""));
|
||||
QueryRequest queryRequest = new QueryRequest();
|
||||
QueryType query = new QueryType();
|
||||
query.setQueryDefinition(CanonicalQueryTypes.ADHOC_QUERY);
|
||||
query.getSlot().add(queryLanguageSlot);
|
||||
query.getSlot().add(queryExpressionSlot);
|
||||
queryRequest.setQuery(query);
|
||||
queryRequest.setResponseOption(new ResponseOptionType(
|
||||
QueryReturnTypes.REGISTRY_OBJECT, true));
|
||||
|
||||
queryRequest.setStartIndex(new BigInteger("0"));
|
||||
queryRequest.setMaxResults(new BigInteger(String
|
||||
.valueOf(SYNC_BATCH_SIZE)));
|
||||
queryRequest.setId("Synchronizing object type: " + objectType);
|
||||
SlotType slot = queryRequest.getQuery().getSlotByName(
|
||||
QueryConstants.QUERY_EXPRESSION);
|
||||
slot.setSlotValue(new StringValueType(String.format(SYNC_QUERY,
|
||||
objectType)));
|
||||
|
||||
int queryCount = 0;
|
||||
int resultCount = 0;
|
||||
do {
|
||||
BigInteger start = queryRequest.getStartIndex().add(
|
||||
new BigInteger(String.valueOf(resultCount)));
|
||||
queryRequest.setStartIndex(start);
|
||||
statusHandler.info("Query #" + (++queryCount) + " to registry ["
|
||||
+ remoteRegistryUrl + "] for objectType [" + objectType
|
||||
+ "]");
|
||||
QueryResponse queryResponse = registrySoapServices
|
||||
.getQueryServiceForHost(remoteRegistryUrl).executeQuery(
|
||||
queryRequest);
|
||||
List<RegistryObjectType> queryResult = queryResponse
|
||||
.getRegistryObjects();
|
||||
resultCount = queryResult.size();
|
||||
|
||||
if (!queryResult.isEmpty()) {
|
||||
for (RegistryObjectType registryObject : queryResult) {
|
||||
remoteIds.remove(registryObject.getId());
|
||||
}
|
||||
statusHandler.info("Persisting " + queryResult.size()
|
||||
+ " objects of type " + objectType
|
||||
+ " to local registry");
|
||||
registryObjectDao.persistAll(queryResult);
|
||||
registryObjectDao.flushAndClearSession();
|
||||
}
|
||||
} while (resultCount > 0);
|
||||
|
||||
// Ensure we haven't missed any objects
|
||||
if (!remoteIds.isEmpty()) {
|
||||
for (String objectId : remoteIds) {
|
||||
try {
|
||||
RegistryObjectType remoteObject = dataDeliveryRestClient
|
||||
.getRegistryObject(remoteRegistryUrl, objectId);
|
||||
registryObjectDao.createOrUpdate(remoteObject);
|
||||
} catch (Throwable e) {
|
||||
statusHandler.error("Error retrieving object [" + objectId
|
||||
+ "] from [" + remoteRegistryUrl + "]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -52,7 +52,6 @@ import com.raytheon.uf.common.registry.EbxmlNamespaces;
|
|||
import com.raytheon.uf.common.registry.constants.AssociationTypes;
|
||||
import com.raytheon.uf.common.registry.constants.CanonicalQueryTypes;
|
||||
import com.raytheon.uf.common.registry.constants.DeliveryMethodTypes;
|
||||
import com.raytheon.uf.common.registry.constants.QueryLanguages;
|
||||
import com.raytheon.uf.common.registry.constants.QueryReturnTypes;
|
||||
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
|
||||
import com.raytheon.uf.common.registry.services.RegistrySOAPServices;
|
||||
|
@ -79,6 +78,7 @@ import com.raytheon.uf.edex.registry.ebxml.services.query.QueryConstants;
|
|||
* 10/30/2013 1538 bphillip Initial Creation
|
||||
* 11/20/2013 2534 bphillip Added interface
|
||||
* 12/2/2013 1829 bphillip Changed to use modified call to getRegistryObject
|
||||
* 12/9/2013 2613 bphillip Changed getRegistriesSubscribedTo to not execute remote queries
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -180,42 +180,11 @@ public class RegistryFederationStatus implements IRegistryFederationService {
|
|||
List<RegistryObjectType> registries = new ArrayList<RegistryObjectType>();
|
||||
for (NotificationHostConfiguration hostConfig : this.federationManager
|
||||
.getServers().getRegistryReplicationServers()) {
|
||||
|
||||
SlotType queryLanguageSlot = new SlotType(
|
||||
QueryConstants.QUERY_LANGUAGE, new StringValueType(
|
||||
QueryLanguages.HQL));
|
||||
SlotType queryExpressionSlot = new SlotType(
|
||||
QueryConstants.QUERY_EXPRESSION, new StringValueType(
|
||||
"FROM SubscriptionType sub where sub.id like 'Replication Subscription for%"
|
||||
+ RegistryUtil.LOCAL_REGISTRY_ADDRESS
|
||||
+ "%'"));
|
||||
QueryType query = new QueryType();
|
||||
query.setQueryDefinition(CanonicalQueryTypes.ADHOC_QUERY);
|
||||
query.getSlot().add(queryLanguageSlot);
|
||||
query.getSlot().add(queryExpressionSlot);
|
||||
|
||||
QueryRequest request = new QueryRequest();
|
||||
request.setResponseOption(new ResponseOptionType(
|
||||
QueryReturnTypes.REGISTRY_OBJECT, true));
|
||||
request.setId("Replication Subscription Verification Query");
|
||||
request.setQuery(query);
|
||||
try {
|
||||
if (!registrySoapServices
|
||||
.getQueryServiceForHost(
|
||||
hostConfig.getRegistryBaseURL())
|
||||
.executeQuery(request).getRegistryObjects()
|
||||
.isEmpty()) {
|
||||
RegistryType registry = registryDao
|
||||
.getRegistryByBaseURL(hostConfig
|
||||
.getRegistryBaseURL());
|
||||
if (registry != null) {
|
||||
registries.add(registry);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
statusHandler.error("Error querying remote registry", e);
|
||||
RegistryType reg = registryDao.getRegistryByBaseURL(hostConfig
|
||||
.getRegistryBaseURL());
|
||||
if (reg != null) {
|
||||
registries.add(reg);
|
||||
}
|
||||
|
||||
}
|
||||
Collections.sort(registries, REGISTRY_COMPARATOR);
|
||||
for (RegistryObjectType reg : registries) {
|
||||
|
|
|
@ -34,7 +34,9 @@ Require-Bundle: com.raytheon.uf.common.registry.schemas.ebxml;bundle-version="1.
|
|||
com.raytheon.uf.common.datadelivery.request;bundle-version="1.0.0",
|
||||
javax.mail;bundle-version="1.0.0",
|
||||
org.apache.commons.validator;bundle-version="1.2.0",
|
||||
com.sun.xml.bind;bundle-version="1.0.0"
|
||||
com.sun.xml.bind;bundle-version="1.0.0",
|
||||
org.eclipse.jetty;bundle-version="7.6.9",
|
||||
org.eclipse.jetty.util;bundle-version="8.1.3"
|
||||
Export-Package: com.raytheon.uf.edex.registry.ebxml.acp,
|
||||
com.raytheon.uf.edex.registry.ebxml.dao,
|
||||
com.raytheon.uf.edex.registry.ebxml.exception,
|
||||
|
|
|
@ -2,22 +2,23 @@
|
|||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
|
||||
|
||||
<bean id="webServerThreadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool">
|
||||
<property name="minThreads" value="5" />
|
||||
<property name="maxThreads" value="${ebxml-webserver-max-threads}" />
|
||||
</bean>
|
||||
|
||||
<bean id="ebxmlRegistryWebServer" class="org.eclipse.jetty.server.Server"
|
||||
init-method="start" destroy-method="stop">
|
||||
|
||||
<property name="threadPool">
|
||||
<bean id="ThreadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool">
|
||||
<property name="minThreads" value="${ebxml-webserver-min-threads}" />
|
||||
<property name="maxThreads" value="${ebxml-webserver-max-threads}" />
|
||||
</bean>
|
||||
</property>
|
||||
<property name="threadPool" ref="webServerThreadPool"/>
|
||||
|
||||
<property name="connectors">
|
||||
<list>
|
||||
<bean id="Connector" class="org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector">
|
||||
<bean id="Connector"
|
||||
class="org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector">
|
||||
<property name="port" value="${EBXML_REGISTRY_WEBSERVER_PORT}" />
|
||||
<property name="maxIdleTime" value="5000" />
|
||||
<property name="acceptors" value="8" />
|
||||
<property name="acceptors" value="2" />
|
||||
<property name="confidentialPort"
|
||||
value="${EBXML_REGISTRY_WEBSERVER_CONFIDENTIAL_PORT}" />
|
||||
</bean>
|
||||
|
|
|
@ -11,7 +11,8 @@ ebxml-federation-sync-threads=3
|
|||
ebxml-email.enabled=false
|
||||
# The maximum number of events that will be batched and send
|
||||
# in a registry replication notification
|
||||
ebxml-notification-batch-size=50
|
||||
# Configuration of thread pool used to handle web service requests
|
||||
ebxml-webserver-min-threads=16
|
||||
ebxml-webserver-max-threads=32
|
||||
ebxml-notification-batch-size=200
|
||||
# The maximum number of threads that the ebxml registry will use for processing web requests
|
||||
# This number must be >=5. As a general rule, the maximum number of connections should be:
|
||||
# 5+{registries this registry is replicating to/from}=max-threads
|
||||
ebxml-webserver-max-threads=8
|
|
@ -73,6 +73,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
|||
* 10/20/2013 1682 bphillip Added synchronous notification delivery
|
||||
* 10/23/2013 1538 bphillip Adding log messages and changed methods to handle DateTime value on
|
||||
* AuditableEvents instead of integer
|
||||
* 12/9/2013 2613 bphillip Changed start time boundary of get auditable events to be the last run time of the subscription
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -229,6 +230,13 @@ public class RegistryNotificationManager {
|
|||
|
||||
List<ObjectRefType> objectsOfInterest = getObjectsOfInterest(subscription);
|
||||
|
||||
XMLGregorianCalendar startTime = subscription
|
||||
.getSlotValue(EbxmlObjectUtil.SUBSCRIPTION_LAST_RUN_TIME_SLOT_NAME);
|
||||
|
||||
if (startTime == null) {
|
||||
startTime = subscription.getStartTime();
|
||||
}
|
||||
|
||||
List<AuditableEventType> eventsOfInterest = getEventsOfInterest(
|
||||
subscription.getStartTime(), subscription.getEndTime(),
|
||||
objectsOfInterest);
|
||||
|
|
|
@ -79,6 +79,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
|||
* 10/23/2013 1538 bphillip Removed debug code and added a change to properly update subscription run time
|
||||
* to not create duplicate slots on objects
|
||||
* 11/20/2013 2534 bphillip Moved method to get notification destinations to utility
|
||||
* 12/9/2013 2613 bphillip Setting last run time of subscription now occurs before notification is sent
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -386,9 +387,9 @@ public class RegistrySubscriptionManager implements
|
|||
}
|
||||
statusHandler.info("Processing subscription [" + subscriptionName
|
||||
+ "]...");
|
||||
updateLastRunTime(subscription, TimeUtil.currentTimeMillis());
|
||||
notificationManager.sendNotifications(listeners
|
||||
.get(subscriptionName));
|
||||
updateLastRunTime(subscription, TimeUtil.currentTimeMillis());
|
||||
} catch (Throwable e) {
|
||||
statusHandler.error(
|
||||
"Errors occurred while processing subscription ["
|
||||
|
|
Loading…
Add table
Reference in a new issue