Issue #1538 Registry Replication cleanup and fixes

Change-Id: I03c4cd01926f648b580712de890a8d914bc3881a

Former-commit-id: 6fb985176f89600f096f86f3094ebdd49d63f442
This commit is contained in:
Benjamin Phillippe 2013-09-05 13:09:11 -05:00
parent 00ba49547d
commit 087d2b72b1
18 changed files with 411 additions and 164 deletions

View file

@ -123,6 +123,9 @@
<logger name="edu">
<level value="WARN" />
</logger>
<logger name="org.apache.cxf">
<level value="ERROR"/>
</logger>
<logger name="com.raytheon.uf.common.datadelivery" additivity="false">
<level value="INFO"/>

View file

@ -30,6 +30,7 @@ package com.raytheon.uf.common.registry.constants;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 5/24/2013 2036 bphillip Initial implementation
* 9/5/2013 1538 bphillip Changed status message
* </pre>
*
* @author bphillip
@ -41,5 +42,5 @@ public class RegistryAvailability {
public static final String AVAILABLE = "Registry services available.";
/** Registry not available since the database is not yet initialized */
public static final String DB_NOT_INITIALIZED = "Registry services available, but database has not yet been initialized!";
public static final String DB_NOT_INITIALIZED = "Registry database and services are currently initializing!";
}

View file

@ -19,6 +19,7 @@
**/
package com.raytheon.uf.common.registry.services;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
@ -29,6 +30,8 @@ import javax.xml.bind.JAXBException;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectType;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.cxf.jaxrs.client.Client;
import org.apache.cxf.jaxrs.client.JAXRSClientFactory;
import com.google.common.cache.CacheBuilder;
@ -38,6 +41,7 @@ import com.google.common.io.Resources;
import com.raytheon.uf.common.registry.RegistryJaxbManager;
import com.raytheon.uf.common.registry.RegistryNamespaceMapper;
import com.raytheon.uf.common.registry.constants.RegistryAvailability;
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
import com.raytheon.uf.common.registry.services.rest.IRegistryAvailableRestService;
import com.raytheon.uf.common.registry.services.rest.IRegistryDataAccessService;
import com.raytheon.uf.common.registry.services.rest.IRegistryObjectsRestService;
@ -58,6 +62,7 @@ import com.raytheon.uf.common.status.UFStatus;
* 5/21/2013 2022 bphillip Initial implementation
* 7/29/2013 2191 bphillip Implemented registry data access service
* 8/1/2013 1693 bphillip Modified getregistry objects method to correctly handle response
* 9/5/2013 1538 bphillip Changed cache expiration timeout and added http header
* </pre>
*
* @author bphillip
@ -67,41 +72,37 @@ public class RegistryRESTServices {
/** Map of known registry object request services */
private static LoadingCache<String, IRegistryObjectsRestService> registryObjectServiceMap = CacheBuilder
.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, IRegistryObjectsRestService>() {
public IRegistryObjectsRestService load(String key) {
return JAXRSClientFactory.create(key,
IRegistryObjectsRestService.class);
public IRegistryObjectsRestService load(String url) {
return getPort(url, IRegistryObjectsRestService.class);
}
});
/** Map of known repository item request services */
private static LoadingCache<String, IRepositoryItemsRestService> repositoryItemServiceMap = CacheBuilder
.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, IRepositoryItemsRestService>() {
public IRepositoryItemsRestService load(String key) {
return JAXRSClientFactory.create(key,
IRepositoryItemsRestService.class);
public IRepositoryItemsRestService load(String url) {
return getPort(url, IRepositoryItemsRestService.class);
}
});
/** Map of known registry availability services */
private static LoadingCache<String, IRegistryAvailableRestService> registryAvailabilityServiceMap = CacheBuilder
.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, IRegistryAvailableRestService>() {
public IRegistryAvailableRestService load(String key) {
return JAXRSClientFactory.create(key,
IRegistryAvailableRestService.class);
public IRegistryAvailableRestService load(String url) {
return getPort(url, IRegistryAvailableRestService.class);
}
});
/** Map of known registry data access services */
private static LoadingCache<String, IRegistryDataAccessService> registryDataAccessServiceMap = CacheBuilder
.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, IRegistryDataAccessService>() {
public IRegistryDataAccessService load(String key) {
return JAXRSClientFactory.create(key,
IRegistryDataAccessService.class);
public IRegistryDataAccessService load(String url) {
return getPort(url, IRegistryDataAccessService.class);
}
});
@ -221,14 +222,23 @@ public class RegistryRESTServices {
* @return True if the registry services are available
*/
public static boolean isRegistryAvailable(String baseURL) {
String response = null;
try {
String response = getRegistryAvailableService(baseURL)
response = getRegistryAvailableService(baseURL)
.isRegistryAvailable();
if (RegistryAvailability.AVAILABLE.equals(response)) {
return true;
} else {
statusHandler.info("Registry at [" + baseURL
+ "] not available: " + response);
}
return RegistryAvailability.AVAILABLE.equals(response);
} catch (Throwable t) {
statusHandler.error(
"Registry at [" + baseURL + "] not available: ",
t.getMessage());
if (response == null) {
response = ExceptionUtils.getRootCauseMessage(t);
}
statusHandler.error("Registry at [" + baseURL + "] not available: "
+ response);
return false;
}
}
@ -275,4 +285,14 @@ public class RegistryRESTServices {
+ url + "]");
}
}
private static <T extends Object> T getPort(String url,
Class<T> serviceClass) {
T service = JAXRSClientFactory.create(url, serviceClass);
Client client = (Client) Proxy.getInvocationHandler((Proxy) service);
// Create HTTP header containing the calling registry
client.header(RegistryUtil.CALLING_REGISTRY_SOAP_HEADER_NAME,
RegistryUtil.LOCAL_REGISTRY_ADDRESS);
return service;
}
}

View file

@ -23,12 +23,13 @@ import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBException;
import javax.xml.namespace.QName;
import javax.xml.ws.BindingProvider;
import javax.xml.ws.wsaddressing.W3CEndpointReference;
import javax.xml.ws.wsaddressing.W3CEndpointReferenceBuilder;
@ -43,20 +44,22 @@ import oasis.names.tc.ebxml.regrep.xsd.rs.v4.RegistryExceptionType;
import oasis.names.tc.ebxml.regrep.xsd.rs.v4.RegistryResponseStatus;
import oasis.names.tc.ebxml.regrep.xsd.rs.v4.RegistryResponseType;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.headers.Header;
import org.apache.cxf.jaxb.JAXBDataBinding;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transports.http.configuration.ConnectionType;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.raytheon.uf.common.comm.ProxyConfiguration;
import com.raytheon.uf.common.comm.ProxyUtil;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.time.util.TimeUtil;
/**
*
@ -71,6 +74,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
* 4/9/2013 1802 bphillip Initial implementation
* Apr 24, 2013 1910 djohnson RegistryResponseStatus is now an enum.
* 8/28/2013 1538 bphillip Removed caches, add http client preferences
* 9/5/2013 1538 bphillip Add HTTP header information
*
* </pre>
*
@ -110,7 +114,7 @@ public class RegistrySOAPServices {
static {
proxyConfig = getProxyConfiguration();
httpClientPolicy = new HTTPClientPolicy();
httpClientPolicy.setReceiveTimeout(TimeUtil.MILLIS_PER_MINUTE * 2);
httpClientPolicy.setReceiveTimeout(15000);
httpClientPolicy.setConnectionTimeout(10000);
httpClientPolicy.setConnection(ConnectionType.KEEP_ALIVE);
httpClientPolicy.setMaxRetransmits(5);
@ -121,6 +125,51 @@ public class RegistrySOAPServices {
}
}
/** Cache of known notification services */
private static LoadingCache<String, NotificationListener> notificationManagerServices = CacheBuilder
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, NotificationListener>() {
public NotificationListener load(String key) {
return getPort(key, NotificationListener.class);
}
});
/** Cache of known lifecycle manager services */
private static LoadingCache<String, LifecycleManager> lifecycleManagerServices = CacheBuilder
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, LifecycleManager>() {
public LifecycleManager load(String key) {
return getPort(key, LifecycleManager.class);
}
});
/** Cache of known cataloger services */
private static LoadingCache<String, Cataloger> catalogerServices = CacheBuilder
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, Cataloger>() {
public Cataloger load(String key) {
return getPort(key, Cataloger.class);
}
});
/** Cache of known query services */
private static LoadingCache<String, QueryManager> queryServices = CacheBuilder
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, QueryManager>() {
public QueryManager load(String key) {
return getPort(key, QueryManager.class);
}
});
/** Cache of known validator services */
private static LoadingCache<String, Validator> validatorServices = CacheBuilder
.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, Validator>() {
public Validator load(String key) {
return getPort(key, Validator.class);
}
});
/**
* Gets the notification listener service URL for the given host
*
@ -162,7 +211,12 @@ public class RegistrySOAPServices {
*/
public static NotificationListener getNotificationListenerServiceForUrl(
final String url) throws RegistryServiceException {
return getPort(url, NotificationListener.class);
try {
return notificationManagerServices.get(url);
} catch (ExecutionException e) {
throw new RegistryServiceException(
"Error getting notification service!", e);
}
}
/**
@ -191,7 +245,12 @@ public class RegistrySOAPServices {
*/
public static LifecycleManager getLifecycleManagerServiceForUrl(
final String url) throws RegistryServiceException {
return getPort(url, LifecycleManager.class);
try {
return lifecycleManagerServices.get(url);
} catch (ExecutionException e) {
throw new RegistryServiceException(
"Error getting lifecycleManager service", e);
}
}
/**
@ -220,7 +279,12 @@ public class RegistrySOAPServices {
*/
public static Cataloger getCatalogerServiceForUrl(final String url)
throws RegistryServiceException {
return getPort(url, Cataloger.class);
try {
return catalogerServices.get(url);
} catch (ExecutionException e) {
throw new RegistryServiceException(
"Error getting cataloger service!", e);
}
}
/**
@ -248,7 +312,12 @@ public class RegistrySOAPServices {
*/
public static QueryManager getQueryServiceForUrl(final String url)
throws RegistryServiceException {
return getPort(url, QueryManager.class);
try {
return queryServices.get(url);
} catch (ExecutionException e) {
throw new RegistryServiceException(
"Error gett queryManager service!", e);
}
}
/**
@ -277,7 +346,12 @@ public class RegistrySOAPServices {
*/
public static Validator getValidatorServiceForUrl(final String url)
throws RegistryServiceException {
return getPort(url, Validator.class);
try {
return validatorServices.get(url);
} catch (ExecutionException e) {
throw new RegistryServiceException(
"Error getting validator service!", e);
}
}
/**
@ -340,27 +414,13 @@ public class RegistrySOAPServices {
W3CEndpointReference ref = endpointBuilder.build();
T port = (T) ref.getPort(serviceInterface);
((HTTPConduit) ClientProxy.getClient(port).getConduit())
.setClient(httpClientPolicy);
if (RegistryUtil.LOCAL_REGISTRY_ADDRESS != null) {
List<Header> headerList = new ArrayList<Header>(1);
Header header = null;
try {
header = new Header(new QName(
RegistryUtil.CALLING_REGISTRY_SOAP_HEADER_NAME),
RegistryUtil.LOCAL_REGISTRY_ADDRESS,
new JAXBDataBinding(String.class));
} catch (JAXBException e) {
throw new RegistryServiceException(
"Error creating header objects on service port", e);
}
headerList.add(header);
BindingProvider bindingProvider = (BindingProvider) port;
bindingProvider.getRequestContext().put(Header.HEADER_LIST,
headerList);
}
Client client = ClientProxy.getClient(port);
((HTTPConduit) client.getConduit()).setClient(httpClientPolicy);
// Create HTTP header containing the calling registry
Map<String, List<String>> headers = new HashMap<String, List<String>>();
headers.put(RegistryUtil.CALLING_REGISTRY_SOAP_HEADER_NAME,
Arrays.asList(RegistryUtil.LOCAL_REGISTRY_ADDRESS));
client.getRequestContext().put(Message.PROTOCOL_HEADERS, headers);
return port;
}

View file

@ -41,7 +41,7 @@ import org.apache.cxf.annotations.GZIP;
*/
@GZIP(threshold = 0)
@WebService(name = "NotificationListener", targetNamespace = "urn:oasis:names:tc:ebxml-regrep:wsdl:NotificationListener:interfaces:4.0")
@SOAPBinding(style = Style.RPC, parameterStyle = SOAPBinding.ParameterStyle.BARE)
@SOAPBinding(style = Style.DOCUMENT, parameterStyle = SOAPBinding.ParameterStyle.BARE)
@XmlSeeAlso({ oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectFactory.class,
oasis.names.tc.ebxml.regrep.xsd.spi.v4.ObjectFactory.class,
oasis.names.tc.ebxml.regrep.xsd.lcm.v4.ObjectFactory.class,

View file

@ -97,6 +97,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
* 6/4/2013 1707 bphillip Changed to use new NotificationServer objects
* 7/29/2013 2191 bphillip Implemented registry sync for registries that have been down for an extended period of time
* 8/1/2013 1693 bphillip Switch to use rest service instead of query manager for federation synchronization
* 9/5/2013 1538 bphillip Changed when the registry availability monitor is started
* </pre>
*
* @author bphillip
@ -145,7 +146,7 @@ public class RegistryReplicationManager {
* will be used for synchronization. Configurable in the
* com.raytheon.uf.edex.registry.ebxml.properties file. Default is 25
*/
private int registrySyncThreads = 25;
private int registrySyncThreads = 5;
private int maxSyncRetries = 3;
@ -248,16 +249,9 @@ public class RegistryReplicationManager {
} else {
synchronizeRegistryWithFederation(registryToSyncFrom
.getRegistryBaseURL());
statusHandler
.info("Starting federated uptime monitor...");
scheduler.scheduleAtFixedRate(
federatedRegistryMonitor, 0, 1,
TimeUnit.MINUTES);
// Sync was successful, break out of retry loop
break;
}
}
startUptimeMonitor();
} catch (Exception e) {
// If no servers are found, don't retry, just throw the
// exception
@ -280,6 +274,12 @@ public class RegistryReplicationManager {
}
}
private void startUptimeMonitor() {
statusHandler.info("Starting federated uptime monitor...");
scheduler.scheduleAtFixedRate(federatedRegistryMonitor, 0, 1,
TimeUnit.MINUTES);
}
private void synchronizeRegistryWithFederation(String remoteRegistryUrl)
throws MsgRegistryException, EbxmlRegistryException {
ExecutorService executor = Executors

View file

@ -39,6 +39,7 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 3/13/2013 1082 bphillip Initial creation
* 9/5/2013 1538 bphillip Added eagerLoadAll method
*
* </pre>
*
@ -64,6 +65,34 @@ public class SubscriptionDao extends RegistryObjectTypeDao<SubscriptionType> {
subscriptionJaxbManager = new JAXBManager(SubscriptionType.class);
}
/**
* Eagerly loads all the registry subscriptions
*
* @return All subscriptions in the registry
* @throws EbxmlRegistryException
* If errors occur while querying
*/
public List<SubscriptionType> eagerLoadAll() throws EbxmlRegistryException {
List<SubscriptionType> subs = this.template
.loadAll(SubscriptionType.class);
for (SubscriptionType sub : subs) {
try {
/*
* FIXME: This is just a quick and dirty way of fully
* initializing all the fields of the subscription. Since this
* query happens relatively infrequently, having this operation
* here does not pose any sort of performance penalty.
* Obviously, a better solution needs to be devised in the
* future
*/
subscriptionJaxbManager.marshalToXml(sub);
} catch (JAXBException e) {
throw new EbxmlRegistryException("Error initializing bean!", e);
}
}
return subs;
}
/**
* Retrieves the fully populated subscription object
*

View file

@ -0,0 +1,91 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.registry.ebxml.services;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.CollectionUtil;
/**
*
* Service interceptor for logging web service and rest calls
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 9/5/2013 1538 bphillip Initial implementation
* </pre>
*
* @author bphillip
* @version 1
*/
public class RegistryServiceInInterceptor extends
AbstractPhaseInterceptor<Message> {
/** The logger */
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(RegistryServiceInInterceptor.class);
public RegistryServiceInInterceptor() {
super(Phase.RECEIVE);
}
@SuppressWarnings("unchecked")
@Override
public void handleMessage(Message message) throws Fault {
StringBuilder logMessage = new StringBuilder();
HttpServletRequest request = (HttpServletRequest) message
.get(AbstractHTTPDestination.HTTP_REQUEST);
Map<String, List<String>> headers = (Map<String, List<String>>) message
.get(Message.PROTOCOL_HEADERS);
List<String> callingRegistryList = headers
.get(RegistryUtil.CALLING_REGISTRY_SOAP_HEADER_NAME);
if (request.getRequestURI().startsWith("/rest")) {
logMessage.append("REST: ");
} else {
logMessage.append("WS: ");
}
logMessage.append("Request from [");
if (CollectionUtil.isNullOrEmpty(callingRegistryList)) {
logMessage.append(request.getRemoteAddr()).append("]: ")
.append(request.getMethod()).append(" ")
.append(request.getRequestURI());
} else {
logMessage.append(callingRegistryList.get(0)).append("]: ")
.append(request.getMethod()).append(" ")
.append(request.getRequestURI());
}
statusHandler.info(logMessage.toString());
}
}

View file

@ -112,6 +112,7 @@ public class NotificationListenerImpl implements NotificationListener {
@Override
public void onNotification(NotificationType notification) {
String clientBaseURL = EbxmlObjectUtil.getClientHost(wsContext);
RegistryType sourceRegistry = registryDao
.getRegistryByBaseURL(clientBaseURL);

View file

@ -154,7 +154,7 @@ public class RegistryNotificationManager {
SubscriptionNotificationListeners notificationListeners,
final List<ObjectRefType> objectsOfInterest)
throws EbxmlRegistryException {
int SIZE_LIMIT = 100;
int SIZE_LIMIT = 10;
final List<NotificationListenerWrapper> listeners = notificationListeners.listeners;
final SubscriptionType subscription = notificationListeners.subscription;
@ -167,6 +167,7 @@ public class RegistryNotificationManager {
int subListCount = eventsOfInterest.size() / SIZE_LIMIT;
int lastListSize = eventsOfInterest.size() % SIZE_LIMIT;
for (int i = 0; i < subListCount; i++) {
NotificationType notification = getNotification(
subscription,
listener.address,

View file

@ -46,7 +46,11 @@ import oasis.names.tc.ebxml.regrep.xsd.rim.v4.QueryType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SubscriptionType;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
@ -66,6 +70,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.registry.ebxml.dao.SubscriptionDao;
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
import com.raytheon.uf.edex.registry.ebxml.services.IRegistrySubscriptionManager;
import com.raytheon.uf.edex.registry.ebxml.services.query.QueryManagerImpl;
import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
@ -85,6 +90,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
* 5/21/2013 2022 bphillip Made logging less verbose. added running boolean so subscriptions are not process on every single
* event.
* 6/4/2013 2022 bphillip Changed slot type of subscription last run time. Longs were being truncated when casting to ints
* 9/5/2013 1538 bphillip Changed processing of each subscription to be in their own transaction. Subscriptions are now loaded on startup
* </pre>
*
* @author bphillip
@ -93,7 +99,8 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
@Transactional
@Component
public class RegistrySubscriptionManager implements
IRegistrySubscriptionManager {
IRegistrySubscriptionManager, ApplicationContextAware,
RegistryInitializedListener {
/** The logger instance */
private static final IUFStatusHandler statusHandler = UFStatus
@ -162,6 +169,8 @@ public class RegistrySubscriptionManager implements
private final ConcurrentMap<String, SubscriptionNotificationListeners> listeners = new ConcurrentHashMap<String, SubscriptionNotificationListeners>();
private ApplicationContext applicationContext;
public RegistrySubscriptionManager() {
}
@ -171,6 +180,22 @@ public class RegistrySubscriptionManager implements
this.subscriptionProcessingEnabled = subscriptionProcessingEnabled;
}
@Override
public void executeAfterRegistryInit() throws EbxmlRegistryException {
for (SubscriptionType subscription : subscriptionDao.eagerLoadAll()) {
statusHandler.info("Adding Subscription: " + subscription.getId());
addSubscriptionListener(subscription);
}
}
private void addSubscriptionListener(SubscriptionType subscription)
throws EbxmlRegistryException {
final List<NotificationListenerWrapper> subscriptionListeners = getNotificationListenersForSubscription(subscription);
listeners.put(subscription.getId(),
new SubscriptionNotificationListeners(subscription,
subscriptionListeners));
}
/**
* Adds subscription notification listeners for any subscriptions.
*/
@ -183,9 +208,8 @@ public class RegistrySubscriptionManager implements
try {
final SubscriptionType subscription = subscriptionDao
.eagerGetById(id);
final List<NotificationListenerWrapper> subscriptionListeners = getNotificationListenersForSubscription(subscription);
listeners.put(id, new SubscriptionNotificationListeners(
subscription, subscriptionListeners));
addSubscriptionListener(subscription);
} catch (EbxmlRegistryException e) {
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage(),
e);
@ -305,28 +329,19 @@ public class RegistrySubscriptionManager implements
.values();
for (SubscriptionNotificationListeners subNotificationListener : subs) {
SubscriptionType sub = subscriptionDao
.getById(subNotificationListener.subscription.getId());
try {
if (subscriptionShouldRun(sub)) {
try {
processSubscription(subNotificationListener);
} catch (Throwable e) {
statusHandler.error(
"Errors occurred while processing subscription ["
+ sub.getId() + "]", e);
}
} else {
statusHandler
.info("Skipping subscription ["
+ sub.getId()
+ "]. Required notification frequency interval has not elapsed.");
}
} catch (EbxmlRegistryException e) {
statusHandler.error(
"Error processing subscription [" + sub.getId()
+ "]", e);
if (subscriptionDao
.getById(subNotificationListener.subscription.getId()) == null) {
statusHandler
.info("Registry subscription removed. Cancelling processing of subscription: "
+ subNotificationListener.subscription
.getId());
continue;
}
RegistrySubscriptionManager myself = (RegistrySubscriptionManager) applicationContext
.getBean("RegistrySubscriptionManager");
myself.processSubscription(subNotificationListener.subscription
.getId());
}
if (!subs.isEmpty()) {
statusHandler.info("Registry subscriptions processed in "
@ -422,20 +437,36 @@ public class RegistrySubscriptionManager implements
* @throws MsgRegistryException
* @throws EbxmlRegistryException
*/
private void processSubscription(
final SubscriptionNotificationListeners subscriptionNotificationsListeners)
throws MsgRegistryException, EbxmlRegistryException {
SubscriptionType subscription = subscriptionDao
.getById(subscriptionNotificationsListeners.subscription
.getId());
updateLastRunTime(subscription, TimeUtil.currentTimeMillis());
statusHandler.info("Processing subscription [" + subscription.getId()
+ "]...");
List<ObjectRefType> objectsOfInterest = getObjectsOfInterest(subscription);
if (!objectsOfInterest.isEmpty()) {
notificationManager.sendNotifications(
subscriptionNotificationsListeners, objectsOfInterest);
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void processSubscription(final String subscriptionName) {
try {
SubscriptionType subscription = subscriptionDao
.getById(subscriptionName);
if (subscription == null) {
statusHandler
.info("Registry subscription removed. Cancelling processing of subscription: "
+ subscriptionName);
return;
}
if (!subscriptionShouldRun(subscription)) {
statusHandler
.info("Skipping subscription ["
+ subscription.getId()
+ "]. Required notification frequency interval has not elapsed.");
return;
}
statusHandler.info("Processing subscription [" + subscriptionName
+ "]...");
List<ObjectRefType> objectsOfInterest = getObjectsOfInterest(subscription);
if (!objectsOfInterest.isEmpty()) {
notificationManager.sendNotifications(
listeners.get(subscriptionName), objectsOfInterest);
}
updateLastRunTime(subscription, TimeUtil.currentTimeMillis());
} catch (Throwable e) {
statusHandler.error(
"Errors occurred while processing subscription ["
+ subscriptionName + "]", e);
}
}
@ -496,4 +527,11 @@ public class RegistrySubscriptionManager implements
INotificationListenerFactory notificationListenerFactory) {
this.notificationListenerFactory = notificationListenerFactory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = applicationContext;
}
}

View file

@ -98,6 +98,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
* 3/18/2013 1802 bphillip Modified to use transaction boundaries and spring injection
* Apr 24, 2013 1910 djohnson RegistryResponseStatus is now an enum.
* Jun 24, 2013 2106 djohnson Transaction must already be open.
* 9/5/2013 1538 bphillip Removed log message
*
* </pre>
*
@ -548,8 +549,6 @@ public class QueryManagerImpl implements QueryManager {
public QueryResponse executeQuery(ResponseOptionType responseOption,
QueryType queryType, int depth, boolean matchOlderVersions,
int maxResults, int startIndex) throws MsgRegistryException {
statusHandler
.info("Received internal request for query using specified values");
QueryRequest queryRequest = EbxmlObjectUtil.queryObjectFactory
.createQueryRequest();
queryRequest.setResponseOption(responseOption);

View file

@ -56,6 +56,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
* 3/18/2013 1802 bphillip Modified to use transaction boundaries and spring dao injection
* 4/9/2013 1802 bphillip Refactor of registry query handling
* Jun 24, 2013 2106 djohnson Requires a transaction to be open, will not create one.
* 9/5/2013 1538 bphillip Removed log message
*
* </pre>
*
@ -108,7 +109,6 @@ public abstract class AbstractEbxmlQuery implements IRegistryQuery {
}
query(queryRequest.getQuery(), queryResponse, client);
statusHandler.info("Query completed.");
}
protected QueryParameters getParameterMap(Collection<SlotType> slots,

View file

@ -28,8 +28,6 @@ import org.springframework.transaction.annotation.Transactional;
import com.raytheon.uf.common.registry.constants.RegistryAvailability;
import com.raytheon.uf.common.registry.services.rest.IRegistryAvailableRestService;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.edex.registry.ebxml.dao.DbInit;
/**
@ -43,6 +41,7 @@ import com.raytheon.uf.edex.registry.ebxml.dao.DbInit;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 5/21/2013 2022 bphillip Initial implementation
* 9/5/2013 1538 bphillip Removed log message
* </pre>
*
* @author bphillip
@ -54,10 +53,6 @@ import com.raytheon.uf.edex.registry.ebxml.dao.DbInit;
public class RegistryAvailableRestService implements
IRegistryAvailableRestService {
/** The logger */
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(RegistryAvailableRestService.class);
/**
* Creates a new RegistryAvailableRestService
*/
@ -68,7 +63,6 @@ public class RegistryAvailableRestService implements
@GET
@Produces("text/plain")
public String isRegistryAvailable() {
statusHandler.info("Received request checking registry availability");
if (DbInit.isDbInitialized()) {
return RegistryAvailability.AVAILABLE;
} else {

View file

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.servlet.http.HttpServletRequest;
@ -47,11 +48,8 @@ import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.StringValueType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.ValueType;
import org.apache.cxf.headers.Header;
import org.apache.cxf.helpers.CastUtils;
import org.w3c.dom.Element;
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
/**
@ -372,28 +370,29 @@ public class EbxmlObjectUtil {
if (mc == null) {
return "INTERNAL";
}
String ip = null;
List<Header> headerList = CastUtils.cast((List<?>) mc
.get(Header.HEADER_LIST));
for (Header header : headerList) {
if (header.getObject() instanceof Element) {
if (header.getName().getLocalPart()
.equals(RegistryUtil.CALLING_REGISTRY_SOAP_HEADER_NAME)) {
return ((Element) header.getObject()).getTextContent();
}
String clientHost = null;
@SuppressWarnings("unchecked")
Map<String, List<String>> requestHeaders = (Map<String, List<String>>) mc
.get(MessageContext.HTTP_REQUEST_HEADERS);
List<String> callingRegistryHeader = requestHeaders
.get(RegistryUtil.CALLING_REGISTRY_SOAP_HEADER_NAME);
if (!CollectionUtil.isNullOrEmpty(callingRegistryHeader)) {
clientHost = callingRegistryHeader.get(0);
} else {
HttpServletRequest request = (HttpServletRequest) mc
.get(MessageContext.SERVLET_REQUEST);
for (int i = 0; (i < 5)
&& (clientHost == null || clientHost.isEmpty() || "unknown"
.equalsIgnoreCase(clientHost)); i++) {
clientHost = request.getHeader(HTTP_HEADERS.get(i));
}
if (clientHost == null || clientHost.length() == 0
|| "unknown".equalsIgnoreCase(clientHost)) {
clientHost = request.getRemoteAddr();
}
}
HttpServletRequest request = (HttpServletRequest) mc
.get(MessageContext.SERVLET_REQUEST);
for (int i = 0; (i < 5)
&& (ip == null || ip.isEmpty() || "unknown"
.equalsIgnoreCase(ip)); i++) {
ip = request.getHeader(HTTP_HEADERS.get(i));
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip;
return clientHost;
}
}

View file

@ -12,10 +12,10 @@
version="2.4">
<display-name>Data Delivery Web Services</display-name>
<context-param>
<param-name>webAppRootKey</param-name>
<param-value>registryEbxml</param-value>
</context-param>
<context-param>
<param-name>webAppRootKey</param-name>
<param-value>registryEbxml</param-value>
</context-param>
<context-param>
<param-name>contextConfigLocation</param-name>
@ -23,18 +23,11 @@
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
<listener-class>
org.springframework.web.context.ContextLoaderListener
</listener-class>
</listener>
<filter>
<filter-name>QoSFilter</filter-name>
<filter-class>org.eclipse.jetty.servlets.QoSFilter</filter-class>
<init-param>
<param-name>maxRequests</param-name>
<param-value>100</param-value>
</init-param>
</filter>
<filter>
<filter-name>hibernateFilter</filter-name>
<filter-class>org.springframework.orm.hibernate3.support.OpenSessionInViewFilter</filter-class>
@ -52,12 +45,6 @@
<url-pattern>/*</url-pattern>
</filter-mapping>
<filter-mapping>
<filter-name>QoSFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet>
<servlet-name>RegistryWebServiceServlet</servlet-name>
<servlet-class>org.apache.cxf.transport.servlet.CXFServlet</servlet-class>

View file

@ -24,6 +24,9 @@
<import resource="classpath:META-INF/cxf/cxf.xml" />
<import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
<bean id="webServiceInInterceptor"
class="com.raytheon.uf.edex.registry.ebxml.services.RegistryServiceInInterceptor" />
<bean id="QueryServiceWrapper"
class="com.raytheon.uf.edex.registry.ebxml.services.query.QueryManagerImplWrapper">
<constructor-arg ref="queryServiceImpl" />
@ -48,6 +51,9 @@
<jaxws:server id="RegistryRequest"
serviceClass="com.raytheon.uf.common.registry.IRegistryRequestService"
address="/registryRequest">
<jaxws:inInterceptors>
<ref bean="webServiceInInterceptor" />
</jaxws:inInterceptors>
<jaxws:serviceBean>
<ref bean="routeWrapper" />
</jaxws:serviceBean>
@ -55,30 +61,45 @@
<!-- SOAP Service definitions -->
<jaxws:server id="QueryService" address="/queryManager">
<jaxws:inInterceptors>
<ref bean="webServiceInInterceptor" />
</jaxws:inInterceptors>
<jaxws:serviceBean>
<ref bean="QueryServiceWrapper" />
</jaxws:serviceBean>
</jaxws:server>
<jaxws:server id="NotificationListenerService" address="/notificationListener">
<jaxws:inInterceptors>
<ref bean="webServiceInInterceptor" />
</jaxws:inInterceptors>
<jaxws:serviceBean>
<ref bean="NotificationListenerServiceWrapper" />
</jaxws:serviceBean>
</jaxws:server>
<jaxws:server id="LifecycleManagerService" address="/lifecycleManager">
<jaxws:inInterceptors>
<ref bean="webServiceInInterceptor" />
</jaxws:inInterceptors>
<jaxws:serviceBean>
<ref bean="LifecycleManagerServiceWrapper" />
</jaxws:serviceBean>
</jaxws:server>
<jaxws:server id="ValidatorService" address="/validator">
<jaxws:inInterceptors>
<ref bean="webServiceInInterceptor" />
</jaxws:inInterceptors>
<jaxws:serviceBean>
<ref bean="ValidatorServiceWrapper" />
</jaxws:serviceBean>
</jaxws:server>
<jaxws:server id="CatalogerService" address="/cataloger">
<jaxws:inInterceptors>
<ref bean="webServiceInInterceptor" />
</jaxws:inInterceptors>
<jaxws:serviceBean>
<ref bean="catalogerServiceImpl" />
</jaxws:serviceBean>
@ -87,8 +108,11 @@
<!-- REST Service Definitions -->
<jaxrs:server id="registryRestServices" address="/">
<jaxrs:inInterceptors>
<ref bean="webServiceInInterceptor" />
</jaxrs:inInterceptors>
<jaxrs:serviceBeans>
<ref bean="registryDataAccessService"/>
<ref bean="registryDataAccessService" />
<ref bean="registryAvailabilityService" />
<ref bean="registryObjectsRestService" />
<ref bean="repositoryObjectsRestService" />