Issue #2769: Added batching to registry sync

Change-Id: I69e05ee148ea8bec6a5d21bd562349a9e2d557db

Former-commit-id: 7ad21da089 [formerly 7bf51d507e] [formerly 59323be87e] [formerly 7ad21da089 [formerly 7bf51d507e] [formerly 59323be87e] [formerly 9fd35140f0 [formerly 59323be87e [formerly a265027cacde7690171d30455227c5a843e0d397]]]]
Former-commit-id: 9fd35140f0
Former-commit-id: fe6d70a38f [formerly daeb8bec5f] [formerly 5c8dacb79a884329e899f0613609bd34935428e9 [formerly cd04d7dcbe]]
Former-commit-id: 80271981040d34f22e2ae5038a50b6a25a0fe9ba [formerly 6b6a5b0e4f]
Former-commit-id: d0999c05e8
This commit is contained in:
Benjamin Phillippe 2014-03-04 13:12:33 -06:00
parent f451950ced
commit d51e7a67a0

View file

@ -21,7 +21,6 @@ package com.raytheon.uf.edex.datadelivery.registry.federation;
import java.io.File;
import java.io.FileNotFoundException;
import java.net.URLEncoder;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Calendar;
@ -44,12 +43,16 @@ import oasis.names.tc.ebxml.regrep.wsdl.registry.services.v4.MsgRegistryExceptio
import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.Mode;
import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.RemoveObjectsRequest;
import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.SubmitObjectsRequest;
import oasis.names.tc.ebxml.regrep.xsd.query.v4.QueryRequest;
import oasis.names.tc.ebxml.regrep.xsd.query.v4.QueryResponse;
import oasis.names.tc.ebxml.regrep.xsd.query.v4.ResponseOptionType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.AssociationType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.FederationType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectRefListType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectRefType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.OrganizationType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.PersonType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.QueryType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectListType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectType;
import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryType;
@ -74,12 +77,16 @@ import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.localization.exception.LocalizationOpFailedException;
import com.raytheon.uf.common.registry.constants.ActionTypes;
import com.raytheon.uf.common.registry.constants.CanonicalQueryTypes;
import com.raytheon.uf.common.registry.constants.DeletionScope;
import com.raytheon.uf.common.registry.constants.QueryLanguages;
import com.raytheon.uf.common.registry.constants.QueryReturnTypes;
import com.raytheon.uf.common.registry.constants.RegistryObjectTypes;
import com.raytheon.uf.common.registry.constants.StatusTypes;
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
import com.raytheon.uf.common.registry.services.RegistrySOAPServices;
import com.raytheon.uf.common.registry.services.RegistryServiceException;
import com.raytheon.uf.common.registry.services.rest.response.RestCollectionResponse;
import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IUFStatusHandler;
@ -99,6 +106,7 @@ import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao;
import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
import com.raytheon.uf.edex.registry.ebxml.exception.NoReplicationServersAvailableException;
import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener;
import com.raytheon.uf.edex.registry.ebxml.services.query.QueryConstants;
import com.raytheon.uf.edex.registry.ebxml.services.query.RegistryQueryUtil;
import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil;
import com.raytheon.uf.edex.registry.events.CreateAuditTrailEvent;
@ -159,6 +167,13 @@ public class RegistryFederationManager implements IRegistryFederationManager,
protected static final IUFStatusHandler statusHandler = UFStatus
.getHandler(RegistryFederationManager.class);
/** Query used for synchronizing registries */
private static final String SYNC_QUERY = "FROM RegistryObjectType obj where obj.id in (%s) order by obj.id asc";
/** Batch size for registry synchronization queries */
private static final int SYNC_BATCH_SIZE = Integer.parseInt(System
.getProperty("ebxml-notification-batch-size"));
private static Set<String> replicatedObjectTypes = new HashSet<String>();
public static final String FEDERATION_ID = "Registry Federation";
@ -593,44 +608,112 @@ public class RegistryFederationManager implements IRegistryFederationManager,
String remoteRegistryUrl = remoteRegistry.getBaseURL();
for (final String objectType : replicatedObjectTypes) {
registryObjectDao
.deleteAll(registryObjectDao
.query("FROM RegistryObjectType obj where obj.objectType=:objectType",
"objectType", objectType));
final Collection<String> remoteIds = dataDeliveryRestClient
.getRegistryDataAccessService(remoteRegistryUrl)
.getRegistryObjectIdsOfType(objectType).getPayload();
if (CollectionUtil.isNullOrEmpty(remoteIds)) {
statusHandler.info("No objects present of type: " + objectType);
continue;
}
statusHandler.info("Synchronizing " + remoteIds.size()
+ " objects of type: " + objectType);
try {
for (String remoteId : remoteIds) {
RegistryObjectType remoteObject = dataDeliveryRestClient
.getRegistryObject(remoteRegistryUrl,
URLEncoder.encode(remoteId, "UTF-8")
.replaceAll("\\+", "%20"));
if (remoteObject != null) {
registryObjectDao.create(remoteObject);
}
}
} catch (Exception e) {
throw new RuntimeException("Error synching object type ["
+ objectType + "] with registry at ["
+ remoteRegistryUrl + "]", e);
}
syncObjectType(objectType, remoteRegistryUrl);
}
statusHandler.info("Registry synchronization using ["
+ remoteRegistryUrl + "] completed successfully in "
+ (TimeUtil.currentTimeMillis() - start) + " ms");
}
/**
* Synchronizes objects of the specified type with the specified remote
* registry
*
* @param objectType
* The object type to synchronize
* @param remoteRegistryUrl
* The url of the remote registry
* @throws EbxmlRegistryException
* If there are errors deleting existing objects
* @throws MsgRegistryException
* If there are errors executing the remote soap query
*/
public void syncObjectType(String objectType, String remoteRegistryUrl)
throws EbxmlRegistryException, MsgRegistryException {
statusHandler.info("Deleting objects of type: " + objectType);
// Executing a select before delete since Hibernate doesn't cascade
// deletes properly when directly deleting with HQL
registryObjectDao.deleteAll(registryObjectDao.query(
"FROM RegistryObjectType obj where obj.objectType=:objectType",
"objectType", objectType));
registryObjectDao.flushAndClearSession();
// Get the list of remote object ids so we can check later to ensure all
// objects were retrieved
RestCollectionResponse<String> response = dataDeliveryRestClient
.getRegistryDataAccessService(remoteRegistryUrl)
.getRegistryObjectIdsOfType(objectType);
if (response.getPayload() == null) {
statusHandler.info("0 objects of type [" + objectType
+ "] present on remote registry. Skipping.");
} else {
List<String> remoteIds = new ArrayList<String>(
response.getPayload());
statusHandler.info("Synchronizing " + remoteIds.size()
+ " objects of type [" + objectType + "]");
int batches = remoteIds.size() / SYNC_BATCH_SIZE;
int remainder = remoteIds.size() % SYNC_BATCH_SIZE;
for (int currentBatch = 0; currentBatch < batches; currentBatch++) {
persistBatch(objectType, remoteRegistryUrl, remoteIds.subList(
currentBatch * SYNC_BATCH_SIZE, (currentBatch + 1)
* SYNC_BATCH_SIZE));
}
// Grab any remaining
if (remainder > 0) {
persistBatch(
objectType,
remoteRegistryUrl,
remoteIds.subList(batches * SYNC_BATCH_SIZE,
remoteIds.size()));
}
}
}
private void persistBatch(String objectType, String remoteRegistryUrl,
List<String> batch) throws MsgRegistryException {
// Average length of ids of registry object is 52. Add 3 for quotes and
// comma
StringBuilder builder = new StringBuilder(55 * batch.size());
for (int i = 0; i < batch.size(); i++) {
builder.append("'").append(batch.get(i)).append("'");
if (i != batch.size() - 1) {
builder.append(",");
}
}
SlotType queryLanguageSlot = new SlotType(
QueryConstants.QUERY_LANGUAGE, new StringValueType(
QueryLanguages.HQL));
SlotType queryExpressionSlot = new SlotType(
QueryConstants.QUERY_EXPRESSION, new StringValueType(""));
QueryRequest queryRequest = new QueryRequest();
QueryType query = new QueryType();
query.setQueryDefinition(CanonicalQueryTypes.ADHOC_QUERY);
query.getSlot().add(queryLanguageSlot);
query.getSlot().add(queryExpressionSlot);
queryRequest.setQuery(query);
queryRequest.setResponseOption(new ResponseOptionType(
QueryReturnTypes.REGISTRY_OBJECT, true));
queryRequest.setId("Synchronizing object type: " + objectType);
StringValueType queryValue = new StringValueType(String.format(
SYNC_QUERY, builder.toString()));
queryExpressionSlot.setSlotValue(queryValue);
QueryResponse queryResponse = soapService.getQueryServiceForHost(
remoteRegistryUrl).executeQuery(queryRequest);
List<RegistryObjectType> queryResult = queryResponse
.getRegistryObjects();
if (!CollectionUtil.isNullOrEmpty(queryResult)) {
registryObjectDao.persistAll(queryResult);
registryObjectDao.flushAndClearSession();
}
}
@GET
@Path("isFederated")
@Transactional