Issue #3011 Fixed replication slowdown
Change-Id: I176d5080a5ccd533099c8f84a779b9d6f8340109 Former-commit-id: 92c584179ddbad65c115092d3bf82ca6fe114b09
This commit is contained in:
parent
000f259cf4
commit
62062bf288
11 changed files with 218 additions and 149 deletions
|
@ -43,4 +43,6 @@ public class RegistryAvailability {
|
||||||
|
|
||||||
/** Registry not available since the database is not yet initialized */
|
/** Registry not available since the database is not yet initialized */
|
||||||
public static final String DB_NOT_INITIALIZED = "Registry database and services are currently initializing!";
|
public static final String DB_NOT_INITIALIZED = "Registry database and services are currently initializing!";
|
||||||
|
|
||||||
|
public static final String SYNC_IN_PROGRESS = "Registry currently being synchronized";
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,8 +53,8 @@ public class ReplicationEventDao extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transactional(propagation = Propagation.MANDATORY, readOnly = true)
|
@Transactional(propagation = Propagation.MANDATORY, readOnly = true)
|
||||||
public List<ReplicationEvent> getReplicationEvents(String remoteRegistry) {
|
public List<ReplicationEvent> getReplicationEvents(String remoteRegistry, int batchSize) {
|
||||||
return this.executeHQLQuery(String.format(GET_REPLICATION_EVENT_QUERY,
|
return this.executeHQLQuery(String.format(GET_REPLICATION_EVENT_QUERY,
|
||||||
remoteRegistry, remoteRegistry));
|
remoteRegistry, remoteRegistry),batchSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,6 @@ import com.raytheon.uf.edex.registry.ebxml.dao.DbInit;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao;
|
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
|
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.exception.NoReplicationServersAvailableException;
|
|
||||||
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
|
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.services.query.QueryConstants;
|
import com.raytheon.uf.edex.registry.ebxml.services.query.QueryConstants;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.services.query.RegistryQueryUtil;
|
import com.raytheon.uf.edex.registry.ebxml.services.query.RegistryQueryUtil;
|
||||||
|
@ -153,6 +152,7 @@ import com.raytheon.uf.edex.registry.events.CreateAuditTrailEvent;
|
||||||
* 1/21/2014 2613 bphillip Changed max down time which requires a sync
|
* 1/21/2014 2613 bphillip Changed max down time which requires a sync
|
||||||
* Feb 11, 2014 2771 bgonzale Use Data Delivery ID instead of Site.
|
* Feb 11, 2014 2771 bgonzale Use Data Delivery ID instead of Site.
|
||||||
* 2/13/2014 2769 bphillip Refactored registry sync. Created quartz tasks to monitor registry uptime as well as subscription integrity
|
* 2/13/2014 2769 bphillip Refactored registry sync. Created quartz tasks to monitor registry uptime as well as subscription integrity
|
||||||
|
* 4/11/2014 3011 bphillip Removed automatic registry sync check on startup
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author bphillip
|
* @author bphillip
|
||||||
|
@ -167,6 +167,9 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
protected static final IUFStatusHandler statusHandler = UFStatus
|
protected static final IUFStatusHandler statusHandler = UFStatus
|
||||||
.getHandler(RegistryFederationManager.class);
|
.getHandler(RegistryFederationManager.class);
|
||||||
|
|
||||||
|
private static final transient IUFStatusHandler monitorHandler = UFStatus
|
||||||
|
.getMonitorHandler(RegistryFederationManager.class);
|
||||||
|
|
||||||
/** Query used for synchronizing registries */
|
/** Query used for synchronizing registries */
|
||||||
private static final String SYNC_QUERY = "FROM RegistryObjectType obj where obj.id in (%s) order by obj.id asc";
|
private static final String SYNC_QUERY = "FROM RegistryObjectType obj where obj.id in (%s) order by obj.id asc";
|
||||||
|
|
||||||
|
@ -196,7 +199,16 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
* The maximum time a registry can be down before a full synchronization is
|
* The maximum time a registry can be down before a full synchronization is
|
||||||
* performed
|
* performed
|
||||||
*/
|
*/
|
||||||
private static final long MAX_DOWN_TIME_DURATION = TimeUtil.MILLIS_PER_HOUR * 6;
|
private static final long MAX_DOWN_TIME_DURATION = TimeUtil.MILLIS_PER_HOUR * 48;
|
||||||
|
|
||||||
|
private static final String SYNC_WARNING_MSG = "Registry is out of sync with federation. Registry Synchronization required. Go to: ["
|
||||||
|
+ RegistryUtil.LOCAL_REGISTRY_ADDRESS
|
||||||
|
+ "/registry/federation/status.html] to synchronize.";
|
||||||
|
|
||||||
|
private static volatile boolean SYNC_NECESSARY = false;
|
||||||
|
|
||||||
|
public static AtomicBoolean SYNC_IN_PROGRESS = new AtomicBoolean(
|
||||||
|
false);
|
||||||
|
|
||||||
/** Cutoff parameter for the query to get the expired events */
|
/** Cutoff parameter for the query to get the expired events */
|
||||||
private static final String GET_EXPIRED_EVENTS_QUERY_CUTOFF_PARAMETER = "cutoff";
|
private static final String GET_EXPIRED_EVENTS_QUERY_CUTOFF_PARAMETER = "cutoff";
|
||||||
|
@ -205,9 +217,6 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
private static final String GET_EXPIRED_EVENTS_QUERY = "FROM ReplicationEvent event where event.eventTime < :"
|
private static final String GET_EXPIRED_EVENTS_QUERY = "FROM ReplicationEvent event where event.eventTime < :"
|
||||||
+ GET_EXPIRED_EVENTS_QUERY_CUTOFF_PARAMETER;
|
+ GET_EXPIRED_EVENTS_QUERY_CUTOFF_PARAMETER;
|
||||||
|
|
||||||
/** Maximum times this registry will try to sync data before failure */
|
|
||||||
private int maxSyncRetries = 3;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Denotes if initialization has already occurred for this class. It is a
|
* Denotes if initialization has already occurred for this class. It is a
|
||||||
* static variable because at this time, multiple Spring containers load
|
* static variable because at this time, multiple Spring containers load
|
||||||
|
@ -320,8 +329,6 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
if (!centralRegistry) {
|
if (!centralRegistry) {
|
||||||
checkDownTime();
|
checkDownTime();
|
||||||
}
|
}
|
||||||
federatedRegistryMonitor.updateTime();
|
|
||||||
|
|
||||||
} catch (Exception e1) {
|
} catch (Exception e1) {
|
||||||
throw new EbxmlRegistryException(
|
throw new EbxmlRegistryException(
|
||||||
"Error initializing RegistryReplicationManager", e1);
|
"Error initializing RegistryReplicationManager", e1);
|
||||||
|
@ -345,96 +352,24 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks how long a registry has been down. If the registry has been down
|
* Checks how long a registry has been down. If the registry has been down
|
||||||
* for over 2 days, the registry is synchronized with one of the federation
|
* longer than the MAX_DOWN_TIME_DURATION, then a sync is necessary
|
||||||
* members
|
|
||||||
*
|
*
|
||||||
|
* @see RegistryFederationManager.MAX_DOWN_TIME_DURATION
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
private void checkDownTime() throws Exception {
|
private void checkDownTime() throws Exception {
|
||||||
long currentTime = TimeUtil.currentTimeMillis();
|
long currentTime = TimeUtil.currentTimeMillis();
|
||||||
long lastKnownUp = federatedRegistryMonitor.getLastKnownUptime();
|
long lastKnownUp = federatedRegistryMonitor.getLastKnownUptime();
|
||||||
long downTime = currentTime - lastKnownUp;
|
long downTime = currentTime - lastKnownUp;
|
||||||
statusHandler
|
statusHandler.info("Registry has been down since: "
|
||||||
.info("Registry has been down since: "
|
+ new Date(currentTime - downTime));
|
||||||
+ new Date(currentTime - downTime)
|
/*
|
||||||
+ ". Checking if synchronization with the federation is necessary...");
|
* The registry has been down for ~2 days, this requires a
|
||||||
|
* synchronization of the data from the federation
|
||||||
// The registry has been down for ~2 days, this requires a
|
*/
|
||||||
// synchronization of the
|
|
||||||
// data from the federation
|
|
||||||
if (currentTime - lastKnownUp > MAX_DOWN_TIME_DURATION) {
|
if (currentTime - lastKnownUp > MAX_DOWN_TIME_DURATION) {
|
||||||
int syncAttempt = 1;
|
SYNC_NECESSARY = true;
|
||||||
for (; syncAttempt <= maxSyncRetries; syncAttempt++) {
|
sendSyncMessage();
|
||||||
try {
|
|
||||||
statusHandler
|
|
||||||
.warn("Registry has been down for more than "
|
|
||||||
+ (MAX_DOWN_TIME_DURATION / TimeUtil.MILLIS_PER_HOUR)
|
|
||||||
+ " hours. Initiating federated registry data synchronization attempt #"
|
|
||||||
+ syncAttempt + "/" + maxSyncRetries
|
|
||||||
+ "...");
|
|
||||||
if (CollectionUtil.isNullOrEmpty(servers
|
|
||||||
.getRegistryReplicationServers())) {
|
|
||||||
statusHandler
|
|
||||||
.error("No servers configured for replication. Unable to synchronize registry data with federation!");
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
RegistryType registryToSyncFrom = null;
|
|
||||||
for (String remoteRegistryId : servers
|
|
||||||
.getRegistryReplicationServers()) {
|
|
||||||
statusHandler.info("Checking availability of ["
|
|
||||||
+ remoteRegistryId + "]...");
|
|
||||||
RegistryType remoteRegistry = dataDeliveryRestClient
|
|
||||||
.getRegistryObject(
|
|
||||||
ncfAddress,
|
|
||||||
remoteRegistryId
|
|
||||||
+ FederationProperties.REGISTRY_SUFFIX);
|
|
||||||
if (remoteRegistry == null) {
|
|
||||||
statusHandler
|
|
||||||
.warn("Registry at ["
|
|
||||||
+ remoteRegistryId
|
|
||||||
+ "] not found in federation. Unable to use as synchronization source.");
|
|
||||||
} else if (dataDeliveryRestClient
|
|
||||||
.isRegistryAvailable(remoteRegistry
|
|
||||||
.getBaseURL())) {
|
|
||||||
registryToSyncFrom = remoteRegistry;
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
statusHandler
|
|
||||||
.info("Registry at ["
|
|
||||||
+ remoteRegistryId
|
|
||||||
+ "] is not available. Unable to use as synchronization source.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// No available registry was found!
|
|
||||||
if (registryToSyncFrom == null) {
|
|
||||||
throw new NoReplicationServersAvailableException(
|
|
||||||
"No available registries found! Registry data will not be synchronized with the federation!");
|
|
||||||
} else {
|
|
||||||
synchronizeWithRegistry(registryToSyncFrom.getId());
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
// If no servers are found, don't retry, just throw the
|
|
||||||
// exception
|
|
||||||
if (e instanceof NoReplicationServersAvailableException) {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
if (syncAttempt < maxSyncRetries) {
|
|
||||||
statusHandler.error(
|
|
||||||
"Federation registry data synchronization attempt #"
|
|
||||||
+ syncAttempt + "/" + maxSyncRetries
|
|
||||||
+ " failed! Retrying...", e);
|
|
||||||
} else {
|
|
||||||
statusHandler
|
|
||||||
.fatal("Federation registry data synchronization has failed",
|
|
||||||
e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -586,33 +521,51 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
@Transactional
|
@Transactional
|
||||||
@GET
|
@GET
|
||||||
@Path("synchronizeWithRegistry/{registryId}")
|
@Path("synchronizeWithRegistry/{registryId}")
|
||||||
public void synchronizeWithRegistry(
|
public void synchronizeWithRegistry(@PathParam("registryId")
|
||||||
@PathParam("registryId") String registryId) throws Exception {
|
String registryId) throws Exception {
|
||||||
long start = TimeUtil.currentTimeMillis();
|
if (SYNC_IN_PROGRESS.compareAndSet(false, true)) {
|
||||||
RegistryType remoteRegistry = null;
|
try {
|
||||||
try {
|
monitorHandler.handle(Priority.WARN,
|
||||||
if (!registryId.endsWith(FederationProperties.REGISTRY_SUFFIX)) {
|
"Synchronizing registry with [" + registryId + "]...");
|
||||||
registryId += FederationProperties.REGISTRY_SUFFIX;
|
long start = TimeUtil.currentTimeMillis();
|
||||||
}
|
RegistryType remoteRegistry = null;
|
||||||
remoteRegistry = dataDeliveryRestClient.getRegistryObject(
|
try {
|
||||||
ncfAddress, registryId);
|
if (!registryId
|
||||||
} catch (Exception e) {
|
.endsWith(FederationProperties.REGISTRY_SUFFIX)) {
|
||||||
throw new EbxmlRegistryException(
|
registryId += FederationProperties.REGISTRY_SUFFIX;
|
||||||
"Error retrieving info for remote registry [" + registryId
|
}
|
||||||
+ "] ", e);
|
remoteRegistry = dataDeliveryRestClient.getRegistryObject(
|
||||||
}
|
ncfAddress, registryId);
|
||||||
if (remoteRegistry == null) {
|
} catch (Exception e) {
|
||||||
throw new EbxmlRegistryException("Unable to synchronize with ["
|
throw new EbxmlRegistryException(
|
||||||
+ registryId + "]. Registry not found in federation");
|
"Error retrieving info for remote registry ["
|
||||||
}
|
+ registryId + "] ", e);
|
||||||
String remoteRegistryUrl = remoteRegistry.getBaseURL();
|
}
|
||||||
|
if (remoteRegistry == null) {
|
||||||
|
throw new EbxmlRegistryException(
|
||||||
|
"Unable to synchronize with [" + registryId
|
||||||
|
+ "]. Registry not found in federation");
|
||||||
|
}
|
||||||
|
String remoteRegistryUrl = remoteRegistry.getBaseURL();
|
||||||
|
|
||||||
for (final String objectType : replicatedObjectTypes) {
|
for (final String objectType : replicatedObjectTypes) {
|
||||||
syncObjectType(objectType, remoteRegistryUrl);
|
syncObjectType(objectType, remoteRegistryUrl);
|
||||||
|
}
|
||||||
|
SYNC_NECESSARY = false;
|
||||||
|
federatedRegistryMonitor.updateTime();
|
||||||
|
StringBuilder syncMsg = new StringBuilder();
|
||||||
|
|
||||||
|
syncMsg.append("Registry synchronization using [")
|
||||||
|
.append(remoteRegistryUrl)
|
||||||
|
.append("] completed successfully in ")
|
||||||
|
.append((TimeUtil.currentTimeMillis() - start))
|
||||||
|
.append(" ms");
|
||||||
|
statusHandler.info(syncMsg.toString());
|
||||||
|
monitorHandler.handle(Priority.WARN, syncMsg.toString());
|
||||||
|
} finally {
|
||||||
|
SYNC_IN_PROGRESS.set(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
statusHandler.info("Registry synchronization using ["
|
|
||||||
+ remoteRegistryUrl + "] completed successfully in "
|
|
||||||
+ (TimeUtil.currentTimeMillis() - start) + " ms");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -658,6 +611,8 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
int remainder = remoteIds.size() % SYNC_BATCH_SIZE;
|
int remainder = remoteIds.size() % SYNC_BATCH_SIZE;
|
||||||
|
|
||||||
for (int currentBatch = 0; currentBatch < batches; currentBatch++) {
|
for (int currentBatch = 0; currentBatch < batches; currentBatch++) {
|
||||||
|
statusHandler.info("Processing batch " + (currentBatch + 1)
|
||||||
|
+ "/" + batches);
|
||||||
persistBatch(objectType, remoteRegistryUrl, remoteIds.subList(
|
persistBatch(objectType, remoteRegistryUrl, remoteIds.subList(
|
||||||
currentBatch * SYNC_BATCH_SIZE, (currentBatch + 1)
|
currentBatch * SYNC_BATCH_SIZE, (currentBatch + 1)
|
||||||
* SYNC_BATCH_SIZE));
|
* SYNC_BATCH_SIZE));
|
||||||
|
@ -714,6 +669,13 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void sendSyncMessage() {
|
||||||
|
if (!SYNC_IN_PROGRESS.get()) {
|
||||||
|
statusHandler.warn(SYNC_WARNING_MSG);
|
||||||
|
monitorHandler.handle(Priority.WARN, SYNC_WARNING_MSG);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path("isFederated")
|
@Path("isFederated")
|
||||||
@Transactional
|
@Transactional
|
||||||
|
@ -795,8 +757,8 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
@GET
|
@GET
|
||||||
@Path("subscribeToRegistry/{registryId}")
|
@Path("subscribeToRegistry/{registryId}")
|
||||||
@Transactional
|
@Transactional
|
||||||
public void subscribeToRegistry(@PathParam("registryId") String registryId)
|
public void subscribeToRegistry(@PathParam("registryId")
|
||||||
throws Exception {
|
String registryId) throws Exception {
|
||||||
statusHandler.info("Establishing replication with [" + registryId
|
statusHandler.info("Establishing replication with [" + registryId
|
||||||
+ "]...");
|
+ "]...");
|
||||||
RegistryType remoteRegistry = getRegistry(registryId);
|
RegistryType remoteRegistry = getRegistry(registryId);
|
||||||
|
@ -809,8 +771,8 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
@GET
|
@GET
|
||||||
@Path("unsubscribeFromRegistry/{registryId}")
|
@Path("unsubscribeFromRegistry/{registryId}")
|
||||||
@Transactional
|
@Transactional
|
||||||
public void unsubscribeFromRegistry(
|
public void unsubscribeFromRegistry(@PathParam("registryId")
|
||||||
@PathParam("registryId") String registryId) throws Exception {
|
String registryId) throws Exception {
|
||||||
statusHandler.info("Disconnecting replication with [" + registryId
|
statusHandler.info("Disconnecting replication with [" + registryId
|
||||||
+ "]...");
|
+ "]...");
|
||||||
RegistryType remoteRegistry = getRegistry(registryId);
|
RegistryType remoteRegistry = getRegistry(registryId);
|
||||||
|
@ -824,8 +786,8 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
@GET
|
@GET
|
||||||
@Path("addReplicationServer/{registryId}")
|
@Path("addReplicationServer/{registryId}")
|
||||||
@Transactional
|
@Transactional
|
||||||
public void addReplicationServer(@PathParam("registryId") String registryId)
|
public void addReplicationServer(@PathParam("registryId")
|
||||||
throws Exception {
|
String registryId) throws Exception {
|
||||||
getRegistry(registryId);
|
getRegistry(registryId);
|
||||||
servers.addReplicationServer(registryId);
|
servers.addReplicationServer(registryId);
|
||||||
saveNotificationServers();
|
saveNotificationServers();
|
||||||
|
@ -834,8 +796,8 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
@GET
|
@GET
|
||||||
@Path("removeReplicationServer/{registryId}")
|
@Path("removeReplicationServer/{registryId}")
|
||||||
@Transactional
|
@Transactional
|
||||||
public void removeReplicationServer(
|
public void removeReplicationServer(@PathParam("registryId")
|
||||||
@PathParam("registryId") String registryId) throws Exception {
|
String registryId) throws Exception {
|
||||||
getRegistry(registryId);
|
getRegistry(registryId);
|
||||||
servers.removeReplicationServer(registryId);
|
servers.removeReplicationServer(registryId);
|
||||||
saveNotificationServers();
|
saveNotificationServers();
|
||||||
|
@ -978,7 +940,8 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
public void processReplicationEvents() {
|
public void processReplicationEvents() {
|
||||||
if (federationEnabled && DbInit.isDbInitialized() && initialized.get()) {
|
if (federationEnabled && DbInit.isDbInitialized() && initialized.get()
|
||||||
|
&& !SYNC_IN_PROGRESS.get()) {
|
||||||
if (!running.getAndSet(true)) {
|
if (!running.getAndSet(true)) {
|
||||||
try {
|
try {
|
||||||
for (final String remoteRegistryId : servers
|
for (final String remoteRegistryId : servers
|
||||||
|
@ -1029,7 +992,7 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
.getBaseURL())) {
|
.getBaseURL())) {
|
||||||
|
|
||||||
List<ReplicationEvent> events = replicationEventDao
|
List<ReplicationEvent> events = replicationEventDao
|
||||||
.getReplicationEvents(remoteRegistryId);
|
.getReplicationEvents(remoteRegistryId, SYNC_BATCH_SIZE);
|
||||||
|
|
||||||
List<SimpleEntry<String, List<ReplicationEvent>>> orderedBatchedEvents = new ArrayList<SimpleEntry<String, List<ReplicationEvent>>>();
|
List<SimpleEntry<String, List<ReplicationEvent>>> orderedBatchedEvents = new ArrayList<SimpleEntry<String, List<ReplicationEvent>>>();
|
||||||
SimpleEntry<String, List<ReplicationEvent>> lastEntry = null;
|
SimpleEntry<String, List<ReplicationEvent>> lastEntry = null;
|
||||||
|
@ -1135,7 +1098,14 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
||||||
@Transactional
|
@Transactional
|
||||||
public void updateUpTime() {
|
public void updateUpTime() {
|
||||||
if (initialized.get()) {
|
if (initialized.get()) {
|
||||||
federatedRegistryMonitor.updateTime();
|
if (SYNC_NECESSARY) {
|
||||||
|
if (!SYNC_IN_PROGRESS.get()
|
||||||
|
&& TimeUtil.newGmtCalendar().get(Calendar.MINUTE) % 15 == 0) {
|
||||||
|
sendSyncMessage();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
federatedRegistryMonitor.updateTime();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,11 +65,14 @@ public class RegistryAvailableRestService implements
|
||||||
@GET
|
@GET
|
||||||
@Produces("text/plain")
|
@Produces("text/plain")
|
||||||
public String isRegistryAvailable() {
|
public String isRegistryAvailable() {
|
||||||
if (DbInit.isDbInitialized()
|
if (DbInit.isDbInitialized()) {
|
||||||
&& RegistryFederationManager.initialized.get()) {
|
if (RegistryFederationManager.initialized.get()) {
|
||||||
return RegistryAvailability.AVAILABLE;
|
if(RegistryFederationManager.SYNC_IN_PROGRESS.get()){
|
||||||
} else {
|
return RegistryAvailability.SYNC_IN_PROGRESS;
|
||||||
return RegistryAvailability.DB_NOT_INITIALIZED;
|
}
|
||||||
|
return RegistryAvailability.AVAILABLE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return RegistryAvailability.DB_NOT_INITIALIZED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,4 +13,8 @@
|
||||||
<bean factory-bean="eventBus" factory-method="register">
|
<bean factory-bean="eventBus" factory-method="register">
|
||||||
<constructor-arg ref="AuditableEventService" />
|
<constructor-arg ref="AuditableEventService" />
|
||||||
</bean>
|
</bean>
|
||||||
|
|
||||||
|
<bean factory-bean="eventBus" factory-method="register">
|
||||||
|
<constructor-arg ref="RegistryGarbageCollector" />
|
||||||
|
</bean>
|
||||||
</beans>
|
</beans>
|
|
@ -36,6 +36,7 @@
|
||||||
<bean id="RegistryGarbageCollector"
|
<bean id="RegistryGarbageCollector"
|
||||||
class="com.raytheon.uf.edex.registry.ebxml.services.RegistryGarbageCollector">
|
class="com.raytheon.uf.edex.registry.ebxml.services.RegistryGarbageCollector">
|
||||||
<constructor-arg ref="AuditableEventTypeDao" />
|
<constructor-arg ref="AuditableEventTypeDao" />
|
||||||
|
<constructor-arg ref="slotTypeDao"/>
|
||||||
</bean>
|
</bean>
|
||||||
|
|
||||||
<bean id="objectReferenceResolver" class="com.raytheon.uf.edex.registry.ebxml.services.lifecycle.ObjectReferenceResolver">
|
<bean id="objectReferenceResolver" class="com.raytheon.uf.edex.registry.ebxml.services.lifecycle.ObjectReferenceResolver">
|
||||||
|
|
|
@ -28,7 +28,6 @@
|
||||||
<bean id="registryObjectDao"
|
<bean id="registryObjectDao"
|
||||||
class="com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao">
|
class="com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao">
|
||||||
<property name="sessionFactory" ref="metadataSessionFactory" />
|
<property name="sessionFactory" ref="metadataSessionFactory" />
|
||||||
<property name="slotDao" ref="slotTypeDao" />
|
|
||||||
</bean>
|
</bean>
|
||||||
|
|
||||||
<bean id="registryDao" class="com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao">
|
<bean id="registryDao" class="com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao">
|
||||||
|
|
|
@ -22,7 +22,6 @@ package com.raytheon.uf.edex.registry.ebxml.dao;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectType;
|
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectType;
|
||||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
|
|
||||||
|
|
||||||
import org.springframework.transaction.annotation.Propagation;
|
import org.springframework.transaction.annotation.Propagation;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
@ -45,6 +44,7 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
||||||
* 7/29/2013 2191 bphillip Added new methods to support registry synchronization
|
* 7/29/2013 2191 bphillip Added new methods to support registry synchronization
|
||||||
* 8/1/2013 1693 bphillip Added methods to facilitate implementation of the lifecyclemanager according to the 4.0 spec
|
* 8/1/2013 1693 bphillip Added methods to facilitate implementation of the lifecyclemanager according to the 4.0 spec
|
||||||
* 2/13/2014 2769 bphillip Added read only flags to query methods
|
* 2/13/2014 2769 bphillip Added read only flags to query methods
|
||||||
|
* 4/11/2014 3011 bphillip Changed merge to not delete unused slots
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -54,9 +54,6 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
||||||
public class RegistryObjectDao extends
|
public class RegistryObjectDao extends
|
||||||
RegistryObjectTypeDao<RegistryObjectType> {
|
RegistryObjectTypeDao<RegistryObjectType> {
|
||||||
|
|
||||||
/** Data access object for accessing slots */
|
|
||||||
private SlotTypeDao slotDao;
|
|
||||||
|
|
||||||
/** Delete object type parameterized statement */
|
/** Delete object type parameterized statement */
|
||||||
private static final String GET_IDS_BY_OBJECT_TYPE = "SELECT regObj.id FROM RegistryObjectType regObj WHERE regObj.objectType=:objectType";
|
private static final String GET_IDS_BY_OBJECT_TYPE = "SELECT regObj.id FROM RegistryObjectType regObj WHERE regObj.objectType=:objectType";
|
||||||
|
|
||||||
|
@ -85,10 +82,6 @@ public class RegistryObjectDao extends
|
||||||
*/
|
*/
|
||||||
public void merge(RegistryObjectType newObject,
|
public void merge(RegistryObjectType newObject,
|
||||||
RegistryObjectType existingObject) {
|
RegistryObjectType existingObject) {
|
||||||
// Delete the existing slot to prevent orphans
|
|
||||||
for (SlotType slot : existingObject.getSlot()) {
|
|
||||||
slotDao.delete(slot);
|
|
||||||
}
|
|
||||||
newObject.setId(existingObject.getId());
|
newObject.setId(existingObject.getId());
|
||||||
template.merge(newObject);
|
template.merge(newObject);
|
||||||
}
|
}
|
||||||
|
@ -198,8 +191,4 @@ public class RegistryObjectDao extends
|
||||||
return RegistryObjectType.class;
|
return RegistryObjectType.class;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSlotDao(SlotTypeDao slotDao) {
|
|
||||||
this.slotDao = slotDao;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,10 +27,15 @@ import oasis.names.tc.ebxml.regrep.xsd.rim.v4.AuditableEventType;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import com.google.common.eventbus.Subscribe;
|
||||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||||
import com.raytheon.uf.common.status.UFStatus;
|
import com.raytheon.uf.common.status.UFStatus;
|
||||||
|
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||||
|
import com.raytheon.uf.common.util.CollectionUtil;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.dao.AuditableEventTypeDao;
|
import com.raytheon.uf.edex.registry.ebxml.dao.AuditableEventTypeDao;
|
||||||
|
import com.raytheon.uf.edex.registry.ebxml.dao.SlotTypeDao;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
||||||
|
import com.raytheon.uf.edex.registry.events.DeleteSlotEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -49,6 +54,7 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
||||||
* 1/15/2014 2613 bphillip Added Hibernate flush() call
|
* 1/15/2014 2613 bphillip Added Hibernate flush() call
|
||||||
* 2/4/2014 2769 bphillip Removed flush and clear call
|
* 2/4/2014 2769 bphillip Removed flush and clear call
|
||||||
* 2/13/2014 2769 bphillip Refactored to no longer use executor threads
|
* 2/13/2014 2769 bphillip Refactored to no longer use executor threads
|
||||||
|
* 4/11/2014 3011 bphillip Added slot purging via event bus notifications
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author bphillip
|
* @author bphillip
|
||||||
|
@ -68,6 +74,8 @@ public class RegistryGarbageCollector {
|
||||||
/** Data access object for AuditableEventType */
|
/** Data access object for AuditableEventType */
|
||||||
private AuditableEventTypeDao eventDao;
|
private AuditableEventTypeDao eventDao;
|
||||||
|
|
||||||
|
private SlotTypeDao slotDao;
|
||||||
|
|
||||||
/** The number of events to delete per batch */
|
/** The number of events to delete per batch */
|
||||||
private static final int DELETE_BATCH_SIZE = 100;
|
private static final int DELETE_BATCH_SIZE = 100;
|
||||||
|
|
||||||
|
@ -85,9 +93,11 @@ public class RegistryGarbageCollector {
|
||||||
* @param eventDao
|
* @param eventDao
|
||||||
* The auditable event dao to use
|
* The auditable event dao to use
|
||||||
*/
|
*/
|
||||||
public RegistryGarbageCollector(AuditableEventTypeDao eventDao) {
|
public RegistryGarbageCollector(AuditableEventTypeDao eventDao,
|
||||||
|
SlotTypeDao slotDao) {
|
||||||
this();
|
this();
|
||||||
this.eventDao = eventDao;
|
this.eventDao = eventDao;
|
||||||
|
this.slotDao = slotDao;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,4 +136,18 @@ public class RegistryGarbageCollector {
|
||||||
}
|
}
|
||||||
} while (!expiredEvents.isEmpty());
|
} while (!expiredEvents.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
public void deleteOrphanedSlot(DeleteSlotEvent slotEvent) {
|
||||||
|
if (!CollectionUtil.isNullOrEmpty(slotEvent.getSlotsToDelete())) {
|
||||||
|
long start = TimeUtil.currentTimeMillis();
|
||||||
|
statusHandler.info("Deleting "
|
||||||
|
+ slotEvent.getSlotsToDelete().size() + " slots...");
|
||||||
|
slotDao.deleteAll(slotEvent.getSlotsToDelete());
|
||||||
|
statusHandler.info("Deleted " + slotEvent.getSlotsToDelete().size()
|
||||||
|
+ " slots in " + (TimeUtil.currentTimeMillis() - start)
|
||||||
|
+ " ms");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,6 +80,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.EbxmlExceptionUtil;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
|
||||||
import com.raytheon.uf.edex.registry.ebxml.util.xpath.RegistryXPathProcessor;
|
import com.raytheon.uf.edex.registry.ebxml.util.xpath.RegistryXPathProcessor;
|
||||||
import com.raytheon.uf.edex.registry.events.CreateAuditTrailEvent;
|
import com.raytheon.uf.edex.registry.events.CreateAuditTrailEvent;
|
||||||
|
import com.raytheon.uf.edex.registry.events.DeleteSlotEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The LifecycleManager interface allows a client to perform various lifecycle
|
* The LifecycleManager interface allows a client to perform various lifecycle
|
||||||
|
@ -108,7 +109,8 @@ import com.raytheon.uf.edex.registry.events.CreateAuditTrailEvent;
|
||||||
* Separate update from create notifications.
|
* Separate update from create notifications.
|
||||||
* 12/2/2013 1829 bphillip Auditable events are not genereted via messages on the event bus
|
* 12/2/2013 1829 bphillip Auditable events are not genereted via messages on the event bus
|
||||||
* 01/21/2014 2613 bphillip Removed verbose log message from removeObjects
|
* 01/21/2014 2613 bphillip Removed verbose log message from removeObjects
|
||||||
* 2/19/2014 2769 bphillip Added current time to audit trail events
|
* 2/19/2014 2769 bphillip Added current time to audit trail events
|
||||||
|
* 4/11/2014 3011 bphillip Modified merge behavior
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
|
@ -417,7 +419,7 @@ public class LifecycleManagerImpl implements LifecycleManager {
|
||||||
*/
|
*/
|
||||||
checkReplica(request, obj, existingObject);
|
checkReplica(request, obj, existingObject);
|
||||||
objsUpdated.add(obj);
|
objsUpdated.add(obj);
|
||||||
registryObjectDao.merge(obj, existingObject);
|
mergeObjects(obj, existingObject);
|
||||||
statusHandler.info("Object [" + objectId
|
statusHandler.info("Object [" + objectId
|
||||||
+ "] replaced in the registry.");
|
+ "] replaced in the registry.");
|
||||||
}
|
}
|
||||||
|
@ -737,7 +739,7 @@ public class LifecycleManagerImpl implements LifecycleManager {
|
||||||
+ "...");
|
+ "...");
|
||||||
RegistryObjectType updatedObject = applyUpdates(objToUpdate,
|
RegistryObjectType updatedObject = applyUpdates(objToUpdate,
|
||||||
updateActions);
|
updateActions);
|
||||||
registryObjectDao.merge(updatedObject, objToUpdate);
|
mergeObjects(updatedObject, objToUpdate);
|
||||||
}
|
}
|
||||||
if (!objectsToUpdate.isEmpty()) {
|
if (!objectsToUpdate.isEmpty()) {
|
||||||
EventBus.publish(new CreateAuditTrailEvent(request.getId(),
|
EventBus.publish(new CreateAuditTrailEvent(request.getId(),
|
||||||
|
@ -752,6 +754,14 @@ public class LifecycleManagerImpl implements LifecycleManager {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void mergeObjects(RegistryObjectType newObject,
|
||||||
|
RegistryObjectType existingObject) {
|
||||||
|
registryObjectDao.merge(newObject, existingObject);
|
||||||
|
DeleteSlotEvent deleteSlotEvent = new DeleteSlotEvent(
|
||||||
|
existingObject.getSlot());
|
||||||
|
EventBus.publish(deleteSlotEvent);
|
||||||
|
}
|
||||||
|
|
||||||
private RegistryObjectType applyUpdates(RegistryObjectType objectToUpdate,
|
private RegistryObjectType applyUpdates(RegistryObjectType objectToUpdate,
|
||||||
List<UpdateActionType> updateActions) throws MsgRegistryException {
|
List<UpdateActionType> updateActions) throws MsgRegistryException {
|
||||||
for (UpdateActionType updateAction : updateActions) {
|
for (UpdateActionType updateAction : updateActions) {
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
/**
|
||||||
|
* This software was developed and / or modified by Raytheon Company,
|
||||||
|
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||||
|
*
|
||||||
|
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||||
|
* This software product contains export-restricted data whose
|
||||||
|
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||||
|
* to non-U.S. persons whether in the United States or abroad requires
|
||||||
|
* an export license or other authorization.
|
||||||
|
*
|
||||||
|
* Contractor Name: Raytheon Company
|
||||||
|
* Contractor Address: 6825 Pine Street, Suite 340
|
||||||
|
* Mail Stop B8
|
||||||
|
* Omaha, NE 68106
|
||||||
|
* 402.291.0100
|
||||||
|
*
|
||||||
|
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||||
|
* further licensing information.
|
||||||
|
**/
|
||||||
|
package com.raytheon.uf.edex.registry.events;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
|
||||||
|
|
||||||
|
import com.raytheon.uf.common.event.Event;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event containing slots to be deleted by the registry garbage collector
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
*
|
||||||
|
* SOFTWARE HISTORY
|
||||||
|
*
|
||||||
|
* Date Ticket# Engineer Description
|
||||||
|
* ------------ ---------- ----------- --------------------------
|
||||||
|
* 4/11/2014 3011 bphillip Initial Coding
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* @author bphillip
|
||||||
|
* @version 1
|
||||||
|
*/
|
||||||
|
public class DeleteSlotEvent extends Event {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -2818002679753482984L;
|
||||||
|
|
||||||
|
private List<SlotType> slotsToDelete;
|
||||||
|
|
||||||
|
public DeleteSlotEvent(){
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public DeleteSlotEvent(List<SlotType> slotsToDelete){
|
||||||
|
this.slotsToDelete = slotsToDelete;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<SlotType> getSlotsToDelete() {
|
||||||
|
return slotsToDelete;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSlotsToDelete(List<SlotType> slotsToDelete) {
|
||||||
|
this.slotsToDelete = slotsToDelete;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue