diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.registry/src/com/raytheon/uf/edex/datadelivery/registry/federation/RegistryFederationManager.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.registry/src/com/raytheon/uf/edex/datadelivery/registry/federation/RegistryFederationManager.java index b4b386c947..ae1236c060 100644 --- a/edexOsgi/com.raytheon.uf.edex.datadelivery.registry/src/com/raytheon/uf/edex/datadelivery/registry/federation/RegistryFederationManager.java +++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.registry/src/com/raytheon/uf/edex/datadelivery/registry/federation/RegistryFederationManager.java @@ -21,7 +21,6 @@ package com.raytheon.uf.edex.datadelivery.registry.federation; import java.io.File; import java.io.FileNotFoundException; -import java.net.URLEncoder; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Calendar; @@ -44,12 +43,16 @@ import oasis.names.tc.ebxml.regrep.wsdl.registry.services.v4.MsgRegistryExceptio import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.Mode; import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.RemoveObjectsRequest; import oasis.names.tc.ebxml.regrep.xsd.lcm.v4.SubmitObjectsRequest; +import oasis.names.tc.ebxml.regrep.xsd.query.v4.QueryRequest; +import oasis.names.tc.ebxml.regrep.xsd.query.v4.QueryResponse; +import oasis.names.tc.ebxml.regrep.xsd.query.v4.ResponseOptionType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.AssociationType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.FederationType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectRefListType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.ObjectRefType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.OrganizationType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.PersonType; +import oasis.names.tc.ebxml.regrep.xsd.rim.v4.QueryType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectListType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryObjectType; import oasis.names.tc.ebxml.regrep.xsd.rim.v4.RegistryType; @@ -74,12 +77,16 @@ import com.raytheon.uf.common.localization.LocalizationFile; import com.raytheon.uf.common.localization.PathManagerFactory; import com.raytheon.uf.common.localization.exception.LocalizationOpFailedException; import com.raytheon.uf.common.registry.constants.ActionTypes; +import com.raytheon.uf.common.registry.constants.CanonicalQueryTypes; import com.raytheon.uf.common.registry.constants.DeletionScope; +import com.raytheon.uf.common.registry.constants.QueryLanguages; +import com.raytheon.uf.common.registry.constants.QueryReturnTypes; import com.raytheon.uf.common.registry.constants.RegistryObjectTypes; import com.raytheon.uf.common.registry.constants.StatusTypes; import com.raytheon.uf.common.registry.ebxml.RegistryUtil; import com.raytheon.uf.common.registry.services.RegistrySOAPServices; import com.raytheon.uf.common.registry.services.RegistryServiceException; +import com.raytheon.uf.common.registry.services.rest.response.RestCollectionResponse; import com.raytheon.uf.common.serialization.JAXBManager; import com.raytheon.uf.common.serialization.SerializationException; import com.raytheon.uf.common.status.IUFStatusHandler; @@ -99,6 +106,7 @@ import com.raytheon.uf.edex.registry.ebxml.dao.RegistryObjectDao; import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException; import com.raytheon.uf.edex.registry.ebxml.exception.NoReplicationServersAvailableException; import com.raytheon.uf.edex.registry.ebxml.init.RegistryInitializedListener; +import com.raytheon.uf.edex.registry.ebxml.services.query.QueryConstants; import com.raytheon.uf.edex.registry.ebxml.services.query.RegistryQueryUtil; import com.raytheon.uf.edex.registry.ebxml.util.EbxmlObjectUtil; import com.raytheon.uf.edex.registry.events.CreateAuditTrailEvent; @@ -159,6 +167,13 @@ public class RegistryFederationManager implements IRegistryFederationManager, protected static final IUFStatusHandler statusHandler = UFStatus .getHandler(RegistryFederationManager.class); + /** Query used for synchronizing registries */ + private static final String SYNC_QUERY = "FROM RegistryObjectType obj where obj.id in (%s) order by obj.id asc"; + + /** Batch size for registry synchronization queries */ + private static final int SYNC_BATCH_SIZE = Integer.parseInt(System + .getProperty("ebxml-notification-batch-size")); + private static Set replicatedObjectTypes = new HashSet(); public static final String FEDERATION_ID = "Registry Federation"; @@ -593,44 +608,112 @@ public class RegistryFederationManager implements IRegistryFederationManager, String remoteRegistryUrl = remoteRegistry.getBaseURL(); for (final String objectType : replicatedObjectTypes) { - registryObjectDao - .deleteAll(registryObjectDao - .query("FROM RegistryObjectType obj where obj.objectType=:objectType", - "objectType", objectType)); - - final Collection remoteIds = dataDeliveryRestClient - .getRegistryDataAccessService(remoteRegistryUrl) - .getRegistryObjectIdsOfType(objectType).getPayload(); - - if (CollectionUtil.isNullOrEmpty(remoteIds)) { - statusHandler.info("No objects present of type: " + objectType); - continue; - } - statusHandler.info("Synchronizing " + remoteIds.size() - + " objects of type: " + objectType); - try { - - for (String remoteId : remoteIds) { - RegistryObjectType remoteObject = dataDeliveryRestClient - .getRegistryObject(remoteRegistryUrl, - URLEncoder.encode(remoteId, "UTF-8") - .replaceAll("\\+", "%20")); - if (remoteObject != null) { - registryObjectDao.create(remoteObject); - } - - } - } catch (Exception e) { - throw new RuntimeException("Error synching object type [" - + objectType + "] with registry at [" - + remoteRegistryUrl + "]", e); - } + syncObjectType(objectType, remoteRegistryUrl); } statusHandler.info("Registry synchronization using [" + remoteRegistryUrl + "] completed successfully in " + (TimeUtil.currentTimeMillis() - start) + " ms"); } + /** + * Synchronizes objects of the specified type with the specified remote + * registry + * + * @param objectType + * The object type to synchronize + * @param remoteRegistryUrl + * The url of the remote registry + * @throws EbxmlRegistryException + * If there are errors deleting existing objects + * @throws MsgRegistryException + * If there are errors executing the remote soap query + */ + public void syncObjectType(String objectType, String remoteRegistryUrl) + throws EbxmlRegistryException, MsgRegistryException { + + statusHandler.info("Deleting objects of type: " + objectType); + // Executing a select before delete since Hibernate doesn't cascade + // deletes properly when directly deleting with HQL + registryObjectDao.deleteAll(registryObjectDao.query( + "FROM RegistryObjectType obj where obj.objectType=:objectType", + "objectType", objectType)); + registryObjectDao.flushAndClearSession(); + + // Get the list of remote object ids so we can check later to ensure all + // objects were retrieved + RestCollectionResponse response = dataDeliveryRestClient + .getRegistryDataAccessService(remoteRegistryUrl) + .getRegistryObjectIdsOfType(objectType); + if (response.getPayload() == null) { + statusHandler.info("0 objects of type [" + objectType + + "] present on remote registry. Skipping."); + + } else { + List remoteIds = new ArrayList( + response.getPayload()); + + statusHandler.info("Synchronizing " + remoteIds.size() + + " objects of type [" + objectType + "]"); + int batches = remoteIds.size() / SYNC_BATCH_SIZE; + int remainder = remoteIds.size() % SYNC_BATCH_SIZE; + + for (int currentBatch = 0; currentBatch < batches; currentBatch++) { + persistBatch(objectType, remoteRegistryUrl, remoteIds.subList( + currentBatch * SYNC_BATCH_SIZE, (currentBatch + 1) + * SYNC_BATCH_SIZE)); + } + // Grab any remaining + if (remainder > 0) { + persistBatch( + objectType, + remoteRegistryUrl, + remoteIds.subList(batches * SYNC_BATCH_SIZE, + remoteIds.size())); + } + } + } + + private void persistBatch(String objectType, String remoteRegistryUrl, + List batch) throws MsgRegistryException { + + // Average length of ids of registry object is 52. Add 3 for quotes and + // comma + StringBuilder builder = new StringBuilder(55 * batch.size()); + for (int i = 0; i < batch.size(); i++) { + builder.append("'").append(batch.get(i)).append("'"); + if (i != batch.size() - 1) { + builder.append(","); + } + } + + SlotType queryLanguageSlot = new SlotType( + QueryConstants.QUERY_LANGUAGE, new StringValueType( + QueryLanguages.HQL)); + SlotType queryExpressionSlot = new SlotType( + QueryConstants.QUERY_EXPRESSION, new StringValueType("")); + QueryRequest queryRequest = new QueryRequest(); + QueryType query = new QueryType(); + query.setQueryDefinition(CanonicalQueryTypes.ADHOC_QUERY); + query.getSlot().add(queryLanguageSlot); + query.getSlot().add(queryExpressionSlot); + queryRequest.setQuery(query); + queryRequest.setResponseOption(new ResponseOptionType( + QueryReturnTypes.REGISTRY_OBJECT, true)); + queryRequest.setId("Synchronizing object type: " + objectType); + StringValueType queryValue = new StringValueType(String.format( + SYNC_QUERY, builder.toString())); + queryExpressionSlot.setSlotValue(queryValue); + + QueryResponse queryResponse = soapService.getQueryServiceForHost( + remoteRegistryUrl).executeQuery(queryRequest); + List queryResult = queryResponse + .getRegistryObjects(); + if (!CollectionUtil.isNullOrEmpty(queryResult)) { + registryObjectDao.persistAll(queryResult); + registryObjectDao.flushAndClearSession(); + } + } + @GET @Path("isFederated") @Transactional