From d81f8ddf89e7f356948619407f3d8a303e224d1f Mon Sep 17 00:00:00 2001 From: Benjamin Phillippe Date: Thu, 16 Jan 2014 15:06:41 -0600 Subject: [PATCH] Issue #2613 Rewrote poorly written section of code in NotificationListenerImpl Change-Id: Ibcdd415667f379a20799a4166b12cc8cc5e05ed4 Former-commit-id: 7516310ab1cddd471b47de1b58203965a41aa509 [formerly 4d4c7837ad1084b8c6007c660c59e3bd29f757cc] Former-commit-id: 6b0d25cf5c126038655e9a73d83b136a8ca47bed --- ...raytheon.uf.edex.registry.ebxml.properties | 2 +- .../NotificationListenerImpl.java | 207 +++++++++++++----- 2 files changed, 155 insertions(+), 54 deletions(-) diff --git a/edexOsgi/com.raytheon.uf.edex.registry.ebxml/resources/com.raytheon.uf.edex.registry.ebxml.properties b/edexOsgi/com.raytheon.uf.edex.registry.ebxml/resources/com.raytheon.uf.edex.registry.ebxml.properties index 1443109d82..b11e318764 100644 --- a/edexOsgi/com.raytheon.uf.edex.registry.ebxml/resources/com.raytheon.uf.edex.registry.ebxml.properties +++ b/edexOsgi/com.raytheon.uf.edex.registry.ebxml/resources/com.raytheon.uf.edex.registry.ebxml.properties @@ -1,7 +1,7 @@ # The period which registry subscriptions are processed ebxml-subscription-process.cron=0/20+*+*+*+*+? # The period which the registry runs the garbage collection -ebxml-garbage-collect-process.cron=*+0/5+*+*+*+? +ebxml-garbage-collect-process.cron=0+0/5+*+*+*+? # The period which adhoc subscriptions are cleaned, every 20 mins adhocsubscription-process.cron=0+0/20+*+*+*+? # When a federation synchonization is necessary, this is the number of threads diff --git a/edexOsgi/com.raytheon.uf.edex.registry.ebxml/src/com/raytheon/uf/edex/registry/ebxml/services/notification/NotificationListenerImpl.java b/edexOsgi/com.raytheon.uf.edex.registry.ebxml/src/com/raytheon/uf/edex/registry/ebxml/services/notification/NotificationListenerImpl.java index 03caf07c38..d2016cb718 100644 --- a/edexOsgi/com.raytheon.uf.edex.registry.ebxml/src/com/raytheon/uf/edex/registry/ebxml/services/notification/NotificationListenerImpl.java +++ b/edexOsgi/com.raytheon.uf.edex.registry.ebxml/src/com/raytheon/uf/edex/registry/ebxml/services/notification/NotificationListenerImpl.java @@ -21,6 +21,7 @@ package com.raytheon.uf.edex.registry.ebxml.services.notification; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import javax.annotation.Resource; @@ -123,8 +124,6 @@ public class NotificationListenerImpl implements NotificationListener { /** Registry soap service client */ private RegistrySOAPServices registrySoapClient; - // @FIXME This method is pretty scary in it's implementation as Richard - // pointed out. Please fix Ben. @Override public void onNotification(NotificationType notification) { long startTime = TimeUtil.currentTimeMillis(); @@ -144,37 +143,28 @@ public class NotificationListenerImpl implements NotificationListener { + ")]"); } - List events = notification.getEvent(); + // Creates a new action list from the received events + ProcessActionList actionList = new ProcessActionList( + notification.getEvent()); - List actionList = new ArrayList(events.size()); - List objIdList = new ArrayList(events.size()); + /* + * Iterate through each action and process them in the order in which + * the were received. Notification types currently supported are create, + * update and delete. All other action types will be ignored and a + * warning message is output into the log + */ + for (ProcessAction action : actionList.getActions()) { + String actionType = action.getActionType(); - for (AuditableEventType event : events) { - List actions = event.getAction(); - for (ActionType action : actions) { - String eventType = action.getEventType(); - List objectIds = getIdsFromAction(action); - objIdList.addAll(objectIds); - for (int i = 0; i < objectIds.size(); i++) { - actionList.add(eventType); - } - } - } - - int listSize = objIdList.size(); - for (int i = 0; i < listSize;) { - List insertIds = new ArrayList(); - while (i < listSize - && (actionList.get(i).equals(ActionTypes.create) || actionList - .get(i).equals(ActionTypes.update))) { - insertIds.add(objIdList.get(i)); - i++; - } - if (!insertIds.isEmpty()) { + /* + * Process creates and updates + */ + if (ActionTypes.create.equals(actionType) + || ActionTypes.update.equals(actionType)) { try { SubmitObjectsRequest submitRequest = createSubmitObjectsRequest( - clientBaseURL, notification.getId(), insertIds, - Mode.CREATE_OR_REPLACE); + clientBaseURL, notification.getId(), + action.getIdList(), Mode.CREATE_OR_REPLACE); lcm.submitObjects(submitRequest); } catch (MsgRegistryException e) { throw new RuntimeException( @@ -184,14 +174,12 @@ public class NotificationListenerImpl implements NotificationListener { "Error creating submit objects request!", e); } } - List deleteIds = new ArrayList(); - while (i < listSize && actionList.get(i).equals(ActionTypes.delete)) { - deleteIds.add(objIdList.get(i)); - i++; - } - if (!deleteIds.isEmpty()) { + /* + * Process deletes + */ + else if (ActionTypes.delete.equals(actionType)) { ObjectRefListType refList = new ObjectRefListType(); - for (String id : deleteIds) { + for (String id : action.getIdList()) { RegistryObjectType object = registryObjectDao.getById(id); if (object != null) { refList.getObjectRef().add(new ObjectRefType(id)); @@ -212,30 +200,22 @@ public class NotificationListenerImpl implements NotificationListener { } } } - } + /* + * Output warning in log that an unsupported action type was + * received + */ + else { + statusHandler.warn("Unsupported action type received [" + + actionType + "]"); + } + } registryDao.flushAndClearSession(); statusHandler.info("Processing notification id [" + notification.getId() + "] completed in " + (TimeUtil.currentTimeMillis() - startTime) + " ms"); } - private List getIdsFromAction(ActionType action) { - List objectIds = new ArrayList(); - if (action.getAffectedObjectRefs() != null) { - for (ObjectRefType ref : action.getAffectedObjectRefs() - .getObjectRef()) { - objectIds.add(ref.getId()); - } - } else if (action.getAffectedObjects() != null) { - for (RegistryObjectType regObj : action.getAffectedObjects() - .getRegistryObject()) { - objectIds.add(regObj.getId()); - } - } - return objectIds; - } - @Override @WebMethod(action = "SynchronousNotification") @WebResult(name = "RegistryResponse", targetNamespace = EbxmlNamespaces.RS_URI, partName = "partRegistryResponse") @@ -371,6 +351,127 @@ public class NotificationListenerImpl implements NotificationListener { return request; } + /** + * This class organizes actions from the received list of auditable events. + * The purpose of this class is to batch together consecutive events having + * the same action type in order to minimize queries to the remote server + * during notification processing. The actions must be processed in the + * order in which the appear in the original auditable event list in order + * to maintain the integrity of the registry across federation members. + * + * @author bphillip + * + */ + private class ProcessActionList { + + /** The list of actions contained in the received auditable events */ + private List actions = new ArrayList(); + + /** + * Creates a new ProcessActionList from the provided list of events + * + * @param events + * The events from which to generate the ProcessActionList + */ + public ProcessActionList(List events) { + if (!CollectionUtil.isNullOrEmpty(events)) { + for (AuditableEventType event : events) { + List actions = event.getAction(); + for (ActionType action : actions) { + addAction(action.getEventType(), + getIdsFromAction(action)); + } + } + } + } + + /** + * Adds an action to the list to be processed + * + * @param actionType + * The type of action + * @param ids + * The ids of the objects on which the action was executed + */ + private void addAction(String actionType, List ids) { + if (actions.isEmpty()) { + actions.add(new ProcessAction(actionType, ids)); + } else { + ProcessAction lastAction = actions.get(actions.size() - 1); + if (lastAction.getActionType().equals(actionType)) { + lastAction.addIds(ids); + } else { + actions.add(new ProcessAction(actionType, ids)); + } + } + + } + + /** + * Extracts the affected object ids from the action + * + * @param action + * The action to get the ids from + * @return The list of ids from the action object + */ + private List getIdsFromAction(ActionType action) { + List objectIds = new ArrayList(); + if (action.getAffectedObjectRefs() != null) { + for (ObjectRefType ref : action.getAffectedObjectRefs() + .getObjectRef()) { + objectIds.add(ref.getId()); + } + } else if (action.getAffectedObjects() != null) { + for (RegistryObjectType regObj : action.getAffectedObjects() + .getRegistryObject()) { + objectIds.add(regObj.getId()); + } + } + return objectIds; + } + + public List getActions() { + return actions; + } + + } + + /** + * Private class encapsulating the list of object ids and the action that + * was executed on them. + * + * @author bphillip + * + */ + private class ProcessAction { + + /** The type of action executed */ + private String actionType; + + /** The list of objects that the action was executed on */ + private List idList; + + public ProcessAction(String actionType, List ids) { + this.actionType = actionType; + addIds(ids); + } + + public String getActionType() { + return actionType; + } + + public List getIdList() { + if (idList == null) { + idList = new LinkedList(); + } + return idList; + } + + public void addIds(List ids) { + getIdList().addAll(ids); + } + } + public void setLcm(LifecycleManagerImpl lcm) { this.lcm = lcm; }