Issue #3011 Fixed slot cleanup
Change-Id: I346b0e2c17bcc0f3bcf946772694c127e8baffa5 Former-commit-id:f4169e4e0c
[formerlyb0d162ccba
] [formerlyb2686e46cf
] [formerlyf4169e4e0c
[formerlyb0d162ccba
] [formerlyb2686e46cf
] [formerlyb86fae838f
[formerlyb2686e46cf
[formerly 4ac3c1a55fe7e99d7e9049ffb401b8ea3caf7983]]]] Former-commit-id:b86fae838f
Former-commit-id:d44b84bdbc
[formerly1d600c8559
] [formerly 6f37719194fea49780dffc9b608b92540757b08a [formerlyd8408a9b3f
]] Former-commit-id: d6efca7ac67e33e080e4754a0bc9c5180bf53e82 [formerly201f25eab3
] Former-commit-id:160f48406a
This commit is contained in:
parent
7c83f534f0
commit
50fd573a75
5 changed files with 128 additions and 71 deletions
|
@ -95,6 +95,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
import com.raytheon.uf.common.util.CollectionUtil;
|
||||
import com.raytheon.uf.common.util.StringUtil;
|
||||
import com.raytheon.uf.edex.core.EDEXUtil;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.availability.FederatedRegistryMonitor;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.dao.ReplicationEventDao;
|
||||
import com.raytheon.uf.edex.datadelivery.registry.replication.NotificationServers;
|
||||
|
@ -104,6 +105,7 @@ import com.raytheon.uf.edex.registry.ebxml.dao.DbInit;
|
|||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryDao;
|
||||
import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
|
||||
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
|
||||
import com.raytheon.uf.edex.registry.ebxml.exception.NoReplicationServersAvailableException;
|
||||
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
|
||||
import com.raytheon.uf.edex.registry.ebxml.services.query.QueryConstants;
|
||||
import com.raytheon.uf.edex.registry.ebxml.services.query.RegistryQueryUtil;
|
||||
|
@ -201,14 +203,7 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
*/
|
||||
private static final long MAX_DOWN_TIME_DURATION = TimeUtil.MILLIS_PER_HOUR * 48;
|
||||
|
||||
private static final String SYNC_WARNING_MSG = "Registry is out of sync with federation. Registry Synchronization required. Go to: ["
|
||||
+ RegistryUtil.LOCAL_REGISTRY_ADDRESS
|
||||
+ "/registry/federation/status.html] to synchronize.";
|
||||
|
||||
private static volatile boolean SYNC_NECESSARY = false;
|
||||
|
||||
public static AtomicBoolean SYNC_IN_PROGRESS = new AtomicBoolean(
|
||||
false);
|
||||
public static AtomicBoolean SYNC_IN_PROGRESS = new AtomicBoolean(false);
|
||||
|
||||
/** Cutoff parameter for the query to get the expired events */
|
||||
private static final String GET_EXPIRED_EVENTS_QUERY_CUTOFF_PARAMETER = "cutoff";
|
||||
|
@ -326,9 +321,6 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
throw new EbxmlRegistryException(
|
||||
"Error joining federation!!");
|
||||
}
|
||||
if (!centralRegistry) {
|
||||
checkDownTime();
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
throw new EbxmlRegistryException(
|
||||
"Error initializing RegistryReplicationManager", e1);
|
||||
|
@ -350,29 +342,6 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
initialized.set(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks how long a registry has been down. If the registry has been down
|
||||
* longer than the MAX_DOWN_TIME_DURATION, then a sync is necessary
|
||||
*
|
||||
* @see RegistryFederationManager.MAX_DOWN_TIME_DURATION
|
||||
* @throws Exception
|
||||
*/
|
||||
private void checkDownTime() throws Exception {
|
||||
long currentTime = TimeUtil.currentTimeMillis();
|
||||
long lastKnownUp = federatedRegistryMonitor.getLastKnownUptime();
|
||||
long downTime = currentTime - lastKnownUp;
|
||||
statusHandler.info("Registry has been down since: "
|
||||
+ new Date(currentTime - downTime));
|
||||
/*
|
||||
* The registry has been down for ~2 days, this requires a
|
||||
* synchronization of the data from the federation
|
||||
*/
|
||||
if (currentTime - lastKnownUp > MAX_DOWN_TIME_DURATION) {
|
||||
SYNC_NECESSARY = true;
|
||||
sendSyncMessage();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean joinFederation() {
|
||||
try {
|
||||
final List<RegistryObjectType> objects = new ArrayList<RegistryObjectType>(
|
||||
|
@ -509,6 +478,62 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks how long a registry has been down. If the registry has been down
|
||||
* for over 2 days, the registry is synchronized with one of the federation
|
||||
* members
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
private void synchronize() throws EbxmlRegistryException {
|
||||
|
||||
monitorHandler.warn("Synchronizing registry with federation...");
|
||||
RegistryType registryToSyncFrom = null;
|
||||
for (String remoteRegistryId : servers.getRegistryReplicationServers()) {
|
||||
statusHandler.info("Checking availability of [" + remoteRegistryId
|
||||
+ "]...");
|
||||
RegistryType remoteRegistry = null;
|
||||
try {
|
||||
remoteRegistry = dataDeliveryRestClient.getRegistryObject(
|
||||
ncfAddress, remoteRegistryId
|
||||
+ FederationProperties.REGISTRY_SUFFIX);
|
||||
} catch (Exception e) {
|
||||
throw new EbxmlRegistryException(
|
||||
"Error getting remote registry object!", e);
|
||||
}
|
||||
if (remoteRegistry == null) {
|
||||
statusHandler
|
||||
.warn("Registry at ["
|
||||
+ remoteRegistryId
|
||||
+ "] not found in federation. Unable to use as synchronization source.");
|
||||
} else if (dataDeliveryRestClient
|
||||
.isRegistryAvailable(remoteRegistry.getBaseURL())) {
|
||||
registryToSyncFrom = remoteRegistry;
|
||||
break;
|
||||
} else {
|
||||
statusHandler
|
||||
.info("Registry at ["
|
||||
+ remoteRegistryId
|
||||
+ "] is not available. Unable to use as synchronization source.");
|
||||
}
|
||||
}
|
||||
|
||||
// No available registry was found!
|
||||
if (registryToSyncFrom == null) {
|
||||
throw new NoReplicationServersAvailableException(
|
||||
"No available registries found! Registry data will not be synchronized with the federation!");
|
||||
} else {
|
||||
try {
|
||||
synchronizeWithRegistry(registryToSyncFrom.getId());
|
||||
} catch (Exception e) {
|
||||
throw new EbxmlRegistryException(
|
||||
"Error synchronizing with registry ["
|
||||
+ registryToSyncFrom.getId() + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronizes this registry's data with the registry at the specified URL
|
||||
*
|
||||
|
@ -523,6 +548,7 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
@Path("synchronizeWithRegistry/{registryId}")
|
||||
public void synchronizeWithRegistry(@PathParam("registryId")
|
||||
String registryId) throws Exception {
|
||||
|
||||
if (SYNC_IN_PROGRESS.compareAndSet(false, true)) {
|
||||
try {
|
||||
monitorHandler.handle(Priority.WARN,
|
||||
|
@ -551,7 +577,6 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
for (final String objectType : replicatedObjectTypes) {
|
||||
syncObjectType(objectType, remoteRegistryUrl);
|
||||
}
|
||||
SYNC_NECESSARY = false;
|
||||
federatedRegistryMonitor.updateTime();
|
||||
StringBuilder syncMsg = new StringBuilder();
|
||||
|
||||
|
@ -565,6 +590,8 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
} finally {
|
||||
SYNC_IN_PROGRESS.set(false);
|
||||
}
|
||||
} else {
|
||||
statusHandler.info("Registry sync already in progress.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -669,13 +696,6 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
}
|
||||
}
|
||||
|
||||
private void sendSyncMessage() {
|
||||
if (!SYNC_IN_PROGRESS.get()) {
|
||||
statusHandler.warn(SYNC_WARNING_MSG);
|
||||
monitorHandler.handle(Priority.WARN, SYNC_WARNING_MSG);
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("isFederated")
|
||||
@Transactional
|
||||
|
@ -1094,14 +1114,31 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
* Updates the record in the registry that keeps track of if this registry
|
||||
* has been up. This method is called every minute via a quartz cron
|
||||
* configured in Camel
|
||||
*
|
||||
* @throws EbxmlRegistryException
|
||||
*/
|
||||
@Transactional
|
||||
public void updateUpTime() {
|
||||
if (initialized.get()) {
|
||||
if (SYNC_NECESSARY) {
|
||||
if (!SYNC_IN_PROGRESS.get()
|
||||
&& TimeUtil.newGmtCalendar().get(Calendar.MINUTE) % 15 == 0) {
|
||||
sendSyncMessage();
|
||||
public void updateUpTime() throws EbxmlRegistryException {
|
||||
if (initialized.get() && EDEXUtil.isRunning()) {
|
||||
long currentTime = TimeUtil.currentTimeMillis();
|
||||
long lastKnownUp = federatedRegistryMonitor.getLastKnownUptime();
|
||||
long downTime = currentTime - lastKnownUp;
|
||||
if (currentTime - lastKnownUp > MAX_DOWN_TIME_DURATION
|
||||
&& !centralRegistry) {
|
||||
if (!SYNC_IN_PROGRESS.get()) {
|
||||
statusHandler.info("Registry has been down since: "
|
||||
+ new Date(currentTime - downTime));
|
||||
statusHandler
|
||||
.warn("Registry is out of sync with federation. Attempting to automatically synchronize");
|
||||
try {
|
||||
synchronize();
|
||||
monitorHandler
|
||||
.info("Registry successfully synchronized!");
|
||||
} catch (EbxmlRegistryException e) {
|
||||
monitorHandler
|
||||
.error("Error synchronizing registry!", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
federatedRegistryMonitor.updateTime();
|
||||
|
@ -1211,4 +1248,4 @@ public class RegistryFederationManager implements IRegistryFederationManager,
|
|||
return servers;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -19,13 +19,16 @@
|
|||
**/
|
||||
package com.raytheon.uf.edex.registry.ebxml.dao;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
|
||||
|
||||
import org.hibernate.SQLQuery;
|
||||
import org.hibernate.criterion.Property;
|
||||
|
||||
import com.raytheon.uf.edex.database.dao.SessionManagedDao;
|
||||
import com.raytheon.uf.edex.registry.ebxml.services.query.QueryConstants;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -84,4 +87,10 @@ public class SlotTypeDao extends SessionManagedDao<String, SlotType> {
|
|||
this.template.delete(slot);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteBySlotId(Collection<String> ids){
|
||||
template.deleteAll(createCriteria().add(
|
||||
Property.forName(QueryConstants.ID).in(ids))
|
||||
.list());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ import com.raytheon.uf.edex.registry.events.DeleteSlotEvent;
|
|||
* 2/4/2014 2769 bphillip Removed flush and clear call
|
||||
* 2/13/2014 2769 bphillip Refactored to no longer use executor threads
|
||||
* 4/11/2014 3011 bphillip Added slot purging via event bus notifications
|
||||
* 4/17/2014 3011 bphillip Delete slot events now contain strings
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -143,7 +144,7 @@ public class RegistryGarbageCollector {
|
|||
long start = TimeUtil.currentTimeMillis();
|
||||
statusHandler.info("Deleting "
|
||||
+ slotEvent.getSlotsToDelete().size() + " slots...");
|
||||
slotDao.deleteAll(slotEvent.getSlotsToDelete());
|
||||
slotDao.deleteBySlotId(slotEvent.getSlotsToDelete());
|
||||
statusHandler.info("Deleted " + slotEvent.getSlotsToDelete().size()
|
||||
+ " slots in " + (TimeUtil.currentTimeMillis() - start)
|
||||
+ " ms");
|
||||
|
|
|
@ -111,6 +111,7 @@ import com.raytheon.uf.edex.registry.events.DeleteSlotEvent;
|
|||
* 01/21/2014 2613 bphillip Removed verbose log message from removeObjects
|
||||
* 2/19/2014 2769 bphillip Added current time to audit trail events
|
||||
* 4/11/2014 3011 bphillip Modified merge behavior
|
||||
* 4/17/2014 3011 bphillip Delete slot events now contain strings
|
||||
*
|
||||
*
|
||||
* </pre>
|
||||
|
@ -300,6 +301,8 @@ public class LifecycleManagerImpl implements LifecycleManager {
|
|||
event.setObjectType(objectType);
|
||||
EventBus.publish(event);
|
||||
}
|
||||
DeleteSlotEvent deleteEvent = new DeleteSlotEvent(obj.getSlot());
|
||||
EventBus.publish(deleteEvent);
|
||||
EventBus.publish(new RegistryStatisticsEvent(obj.getObjectType(),
|
||||
obj.getStatus(), obj.getOwner(), avTimePerRecord));
|
||||
}
|
||||
|
@ -756,9 +759,8 @@ public class LifecycleManagerImpl implements LifecycleManager {
|
|||
|
||||
private void mergeObjects(RegistryObjectType newObject,
|
||||
RegistryObjectType existingObject) {
|
||||
DeleteSlotEvent deleteSlotEvent = new DeleteSlotEvent(existingObject.getSlot());
|
||||
registryObjectDao.merge(newObject, existingObject);
|
||||
DeleteSlotEvent deleteSlotEvent = new DeleteSlotEvent(
|
||||
existingObject.getSlot());
|
||||
EventBus.publish(deleteSlotEvent);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
**/
|
||||
package com.raytheon.uf.edex.registry.events;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.SlotType;
|
||||
|
||||
import com.raytheon.uf.common.event.Event;
|
||||
import com.raytheon.uf.common.util.CollectionUtil;
|
||||
|
||||
/**
|
||||
* Event containing slots to be deleted by the registry garbage collector
|
||||
|
@ -35,6 +37,7 @@ import com.raytheon.uf.common.event.Event;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* 4/11/2014 3011 bphillip Initial Coding
|
||||
* 4/17/2014 3011 bphillip Delete slot events now contain strings
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -42,26 +45,31 @@ import com.raytheon.uf.common.event.Event;
|
|||
*/
|
||||
public class DeleteSlotEvent extends Event {
|
||||
|
||||
private static final long serialVersionUID = -2818002679753482984L;
|
||||
|
||||
private List<SlotType> slotsToDelete;
|
||||
|
||||
public DeleteSlotEvent(){
|
||||
super();
|
||||
}
|
||||
|
||||
public DeleteSlotEvent(List<SlotType> slotsToDelete){
|
||||
this.slotsToDelete = slotsToDelete;
|
||||
}
|
||||
private static final long serialVersionUID = -2818002679753482984L;
|
||||
|
||||
public List<SlotType> getSlotsToDelete() {
|
||||
return slotsToDelete;
|
||||
}
|
||||
private List<String> slotsToDelete;;
|
||||
|
||||
public void setSlotsToDelete(List<SlotType> slotsToDelete) {
|
||||
this.slotsToDelete = slotsToDelete;
|
||||
}
|
||||
|
||||
|
||||
public DeleteSlotEvent() {
|
||||
super();
|
||||
}
|
||||
|
||||
public DeleteSlotEvent(List<SlotType> slotsToDelete) {
|
||||
if (CollectionUtil.isNullOrEmpty(slotsToDelete)) {
|
||||
slotsToDelete = new ArrayList<SlotType>();
|
||||
} else {
|
||||
this.slotsToDelete = new ArrayList<String>(slotsToDelete.size());
|
||||
for (SlotType slot : slotsToDelete) {
|
||||
this.slotsToDelete.add(slot.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getSlotsToDelete() {
|
||||
return slotsToDelete;
|
||||
}
|
||||
|
||||
public void setSlotsToDelete(List<String> slotsToDelete) {
|
||||
this.slotsToDelete = slotsToDelete;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue