buckets = getWhereStartTimeIsLessThanOrEqualTo(
+ millis, network);
+ // last bucket.
+ if (!buckets.isEmpty()) {
+ return buckets.get(buckets.size() -1);
+ } else {
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/BandwidthUtilizationProcessor.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/BandwidthUtilizationProcessor.java
new file mode 100644
index 0000000000..8c6cbda098
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/BandwidthUtilizationProcessor.java
@@ -0,0 +1,142 @@
+package com.raytheon.uf.edex.datadelivery.bandwidth.registry;
+
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+
+import java.util.Calendar;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import com.raytheon.uf.common.datadelivery.registry.Network;
+import com.raytheon.uf.common.status.IUFStatusHandler;
+import com.raytheon.uf.common.status.UFStatus;
+import com.raytheon.uf.common.status.UFStatus.Priority;
+import com.raytheon.uf.common.time.util.TimeUtil;
+import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao;
+
+/**
+ * Process the totals gathered by the BandwidthUtilizationListener class
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 27, 2013 1736 dhladky Initial creation
+ *
+ *
+ *
+ * @author dhladky
+ * @version 1.0
+ */
+
+public class BandwidthUtilizationProcessor implements Job {
+
+ private static final IUFStatusHandler statusHandler = UFStatus
+ .getHandler(BandwidthUtilizationProcessor.class);
+
+ /** total bytes saved in map */
+ private AtomicInteger totalBytes = null;
+ /** long last run time */
+ private long lastRun = 0l;
+ /** job data map pertinent to this execution */
+ private JobDataMap map = null;
+ /** network of the traffic be tracked */
+ private Network network = null;
+ /** bucket Dao class */
+ private IBandwidthBucketDao bucketDao = null;
+ /** bucket size in mins */
+ private int bucketSize = 0;
+
+ public BandwidthUtilizationProcessor() {
+
+ }
+
+ @Override
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+
+ this.map = context.getJobDetail().getJobDataMap();
+ this.network = (Network) map.get("network");
+ this.lastRun = map.getLongValue("lastRun");
+ this.totalBytes = (AtomicInteger) map.get("totalBytes");
+ this.bucketDao = (IBandwidthBucketDao) map.get("bucketDao");
+ this.bucketSize = map.getInt("bucketSize");
+
+ // do the processing
+ processRecord();
+ }
+
+ /**
+ * Gets the bytes per second for the interval run
+ * @return bytesPerSec
+ */
+ private int getBytesPerSecondAndReset() {
+
+ long now = TimeUtil.currentTimeMillis();
+ int diffSeconds = (int) (now - lastRun)/1000;
+ int bytesPerSec = totalBytes.getAndSet(0)/diffSeconds;
+ // reset time
+ map.put("lastRun", now);
+
+ return bytesPerSec;
+ }
+
+ /**
+ * Process the record and save to DB
+ */
+ private synchronized void processRecord() {
+
+ RegistryBandwidthService service = new RegistryBandwidthService(bucketDao, network, bucketSize);
+ RegistryBandwidthRecord rbr = null;
+
+ try {
+ rbr = service.getCurrentRegistryBandwidthRecord();
+ } catch (Exception e) {
+ statusHandler.handle(Priority.PROBLEM, "Could not lookup previous Registry Bandwidth Record! ", e);
+ }
+
+ int nowBytes = getBytesPerSecondAndReset();
+
+ if (rbr != null) {
+ // get the previous bytes/per second value
+ int previousBytes = rbr.getBytes();
+ int averageBytes = (previousBytes + nowBytes)/2;
+ rbr.setBytes(averageBytes);
+
+ } else {
+ // brand spanking new
+ Calendar cal = TimeUtil.newGmtCalendar();
+ Long timePeriodKey = service.getTimePeriodKey(cal);
+
+ if (timePeriodKey != null) {
+ rbr = new RegistryBandwidthRecord(timePeriodKey, nowBytes);
+ }
+ }
+
+ service.addorUpdateRecord(rbr);
+ }
+
+
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthDao.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthDao.java
new file mode 100644
index 0000000000..291bc02737
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthDao.java
@@ -0,0 +1,104 @@
+package com.raytheon.uf.edex.datadelivery.bandwidth.registry;
+
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.raytheon.uf.edex.database.DataAccessLayerException;
+import com.raytheon.uf.edex.database.dao.CoreDao;
+import com.raytheon.uf.edex.database.dao.DaoConfig;
+
+/**
+ * Provider Key Dao
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 15, 2013 1736 dhladky Dao for registry bandwidth gathering
+ *
+ *
+ *
+ * @author dhladky
+ * @version 1.0
+ */
+
+public class RegistryBandwidthDao extends CoreDao {
+
+ private static final String FIELD = "timePeriod";
+
+ private static final String GREATERTHANEQUAL = ">=";
+
+ private static final String LESSTHAN = "<";
+
+ /**
+ * Creates a new RegistryBandwidthDao
+ */
+ public RegistryBandwidthDao() {
+ super(DaoConfig.forClass(RegistryBandwidthRecord.class));
+ }
+
+ /**
+ * Retrieves a RegistryBandwidthRecord with the timePeriod
+ * All times are in seconds since 0 GMT of each day
+ * returns null if none exists
+ * @param startTime
+ * @param endTime
+ * @return The Record with the corresponding timePeriod
+ */
+
+ public RegistryBandwidthRecord queryByTimeRange(long startMillis, long endMillis)
+ throws DataAccessLayerException {
+
+ List fields = new ArrayList(2);
+ List values = new ArrayList(2);
+ List operands = new ArrayList(2);
+ fields.add(FIELD);
+ values.add(startMillis);
+ operands.add(GREATERTHANEQUAL);
+ fields.add(FIELD);
+ values.add(endMillis);
+ operands.add(LESSTHAN);
+
+ List> timePeriods = queryByCriteria(fields, values, operands, 1, FIELD, false);
+ if (timePeriods.isEmpty()) {
+ return null;
+ } else {
+ return (RegistryBandwidthRecord)timePeriods.get(0);
+ }
+ }
+
+ /**
+ * Add or update an existing RegistryBandwidthRecord
+ *
+ * @param record
+ */
+ public void addOrUpdateRecord(RegistryBandwidthRecord record)
+ throws Exception {
+
+ // only persist records we have an active period for
+ if (record.getTimePeriod() != null) {
+ persist(record);
+ }
+ }
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthRecord.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthRecord.java
new file mode 100644
index 0000000000..a5f34f0262
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthRecord.java
@@ -0,0 +1,96 @@
+package com.raytheon.uf.edex.datadelivery.bandwidth.registry;
+
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+
+import java.io.Serializable;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+import com.raytheon.uf.common.dataplugin.persist.IPersistableDataObject;
+import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
+import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
+
+/**
+ * Provider Key Record
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 15, 2013 1736 dhladky Store bandwidth entries
+ *
+ *
+ *
+ *
+ * @author dhladky
+ * @version 1.0
+ */
+@Entity
+@Table(name = "dataDeliveryRegistryBandwidth")
+@DynamicSerialize
+public class RegistryBandwidthRecord implements IPersistableDataObject,
+ Serializable {
+
+ private static final long serialVersionUID = 177884683888461814L;
+
+ @Id
+ @DynamicSerializeElement
+ private Long timePeriod;
+
+ @Column(nullable = false)
+ @DynamicSerializeElement
+ private int bytes;
+
+ public RegistryBandwidthRecord() {
+
+ }
+
+ public RegistryBandwidthRecord(long timePeriod, int bytes) {
+ this.timePeriod = timePeriod;
+ this.bytes = bytes;
+ }
+
+ public int getBytes() {
+ return bytes;
+ }
+
+ public void setBytes(int bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public Long getIdentifier() {
+ return timePeriod;
+ }
+
+ public Long getTimePeriod() {
+ return timePeriod;
+ }
+
+ public void setTimePeriod(long timePeriod) {
+ this.timePeriod = timePeriod;
+ }
+
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthService.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthService.java
new file mode 100644
index 0000000000..5e18c1915a
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthService.java
@@ -0,0 +1,256 @@
+package com.raytheon.uf.edex.datadelivery.bandwidth.registry;
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+import java.util.Calendar;
+
+import com.raytheon.uf.common.datadelivery.registry.Network;
+import com.raytheon.uf.common.status.IUFStatusHandler;
+import com.raytheon.uf.common.status.UFStatus;
+import com.raytheon.uf.common.status.UFStatus.Priority;
+import com.raytheon.uf.common.time.util.TimeUtil;
+import com.raytheon.uf.edex.database.DataAccessLayerException;
+import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthBucket;
+import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao;
+/**
+ * Registry Bandwidth Service.
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 16, 2013 1736 dhladky Initial creation
+ *
+ *
+ *
+ * @author dhladky
+ * @version 1.0
+ */
+public class RegistryBandwidthService {
+
+ private static final IUFStatusHandler statusHandler = UFStatus
+ .getHandler(RegistryBandwidthService.class);
+
+ public static final int BYTES_PER_KILOBYTE = 1024;
+
+ private Network network;
+
+ private IBandwidthBucketDao bucketDao;
+
+ private int bucketSize;
+
+ public RegistryBandwidthService() {
+
+ }
+
+ /**
+ * Construct an instance
+ * @param bucketDao
+ * @param network
+ */
+ public RegistryBandwidthService(IBandwidthBucketDao bucketDao, Network network, int bucketSize) {
+ this.bucketDao = bucketDao;
+ this.network = network;
+ this.bucketSize = bucketSize;
+ }
+
+ /**
+ * Gives a time averaged bandwidth for the current time.
+ * @return
+ */
+ public Integer getCurrentRegistryBandwidth() {
+
+ if (network == Network.OPSNET) {
+
+ RegistryBandwidthRecord rbr = getCurrentRegistryBandwidthRecord();
+
+ if (rbr != null) {
+ // convert to kb per/second
+ return convertBytesToKilobytes(rbr.getBytes());
+ } else {
+ statusHandler
+ .handle(Priority.WARN,
+ "No active registry bandwidth information for current time.");
+ }
+ }
+
+ return 0;
+
+ }
+
+ /**
+ * Gives the current full record. Which is the previous record time wise
+ * because the query is back one full bucketSize in millis.
+ * @return
+ */
+ public RegistryBandwidthRecord getCurrentRegistryBandwidthRecord() {
+
+ RegistryBandwidthRecord rbr = null;
+
+ if (network == Network.OPSNET) {
+
+ RegistryBandwidthDao rbd = new RegistryBandwidthDao();
+ Calendar cal = TimeUtil.newGmtCalendar();
+ Long timePeriodKey = getTimePeriodKey(cal);
+
+ if (timePeriodKey != null) {
+ try {
+ long startMillis = timePeriodKey - bucketSize;
+ long endMillis = timePeriodKey;
+ rbr = rbd.queryByTimeRange(startMillis, endMillis);
+ } catch (DataAccessLayerException dale) {
+ statusHandler.handle(Priority.PROBLEM,
+ "Could not lookup Registry Bandwidth Record! ",
+ dale);
+ }
+ }
+ }
+
+ return rbr;
+ }
+
+ /**
+ * Gives a time averaged bandwidth utilization for the registry, time passed
+ * in.
+ *
+ * @param cal
+ * @return
+ */
+ public Integer getRegistryBandwidth(long millis) {
+
+ if (network == Network.OPSNET) {
+
+ Calendar cal = TimeUtil.newGmtCalendar();
+ cal.setTimeInMillis(millis);
+ RegistryBandwidthRecord rbr = getRegistryBandwidthRecord(cal);
+
+ if (rbr != null) {
+ return convertBytesToKilobytes(rbr.getBytes());
+ } else {
+ // No record for this bucket,
+ // try current
+ return getCurrentRegistryBandwidth();
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Retrieve a registry bandwidth record
+ * @param cal
+ * @return
+ */
+ public RegistryBandwidthRecord getRegistryBandwidthRecord(Calendar cal) {
+
+ RegistryBandwidthDao rbd = new RegistryBandwidthDao();
+ Long timePeriodKey = getTimePeriodKey(cal);
+ RegistryBandwidthRecord rbr = null;
+
+ if (timePeriodKey != null) {
+ try {
+ long startMillis = timePeriodKey - bucketSize/2;
+ long endMillis = timePeriodKey + bucketSize/2;
+ rbr = rbd.queryByTimeRange(startMillis, endMillis);
+ } catch (DataAccessLayerException dale) {
+ statusHandler.handle(Priority.PROBLEM,
+ "Could not lookup Registry Bandwidth Record! ", dale);
+ }
+ }
+
+ return rbr;
+ }
+
+ /**
+ * Retrieve a registry bandwidth record
+ * @param cal
+ * @return
+ */
+ public RegistryBandwidthRecord getRegistryBandwidthRecord(long millis) {
+
+ Calendar cal = TimeUtil.newGmtCalendar();
+ cal.setTimeInMillis(millis);
+
+ return getRegistryBandwidthRecord(cal);
+ }
+
+ /**
+ * Get time period key for some other determined time
+ * @param cal
+ * @return
+ */
+ public Long getTimePeriodKey(Calendar cal) {
+
+ long millis = cal.getTimeInMillis();
+ BandwidthBucket bucket = bucketDao.getBucketContainingTime(millis,
+ network);
+
+ if (bucket != null) {
+ return getTimeKey(bucket.getBucketStartTime());
+ }
+
+ // in the off chance a bucket doesn't exist anywhere near this time.
+ return null;
+ }
+
+ /**
+ * conversion
+ * @param bytes
+ * @return
+ */
+ public static int convertBytesToKilobytes(int bytes) {
+ return bytes / BYTES_PER_KILOBYTE;
+ }
+
+ /**
+ * Add or update the record
+ * @param rbr
+ */
+ public void addorUpdateRecord(RegistryBandwidthRecord rbr) {
+ RegistryBandwidthDao rbd = new RegistryBandwidthDao();
+
+ try {
+ if (rbr != null) {
+ rbd.addOrUpdateRecord(rbr);
+ }
+ } catch (Exception e) {
+ statusHandler.handle(Priority.ERROR,
+ "Could not write Registry Bandwidth Record! ", e);
+ }
+ }
+
+ /**
+ * Records in the DB are kept by millis on a one day cycle
+ * @param current
+ * @return
+ */
+ private long getTimeKey(long current) {
+
+ long value = current % TimeUtil.MILLIS_PER_DAY;
+
+ if (value < 0) {
+ return 0;
+ } else {
+ return value;
+ }
+ }
+
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthUtilizationListener.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthUtilizationListener.java
new file mode 100644
index 0000000000..4635d0673f
--- /dev/null
+++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/registry/RegistryBandwidthUtilizationListener.java
@@ -0,0 +1,254 @@
+package com.raytheon.uf.edex.datadelivery.bandwidth.registry;
+
+/**
+ * This software was developed and / or modified by Raytheon Company,
+ * pursuant to Contract DG133W-05-CQ-1067 with the US Government.
+ *
+ * U.S. EXPORT CONTROLLED TECHNICAL DATA
+ * This software product contains export-restricted data whose
+ * export/transfer/disclosure is restricted by U.S. law. Dissemination
+ * to non-U.S. persons whether in the United States or abroad requires
+ * an export license or other authorization.
+ *
+ * Contractor Name: Raytheon Company
+ * Contractor Address: 6825 Pine Street, Suite 340
+ * Mail Stop B8
+ * Omaha, NE 68106
+ * 402.291.0100
+ *
+ * See the AWIPS II Master Rights File ("Master Rights File.pdf") for
+ * further licensing information.
+ **/
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.io.Buffer;
+import org.eclipse.jetty.io.NetworkTrafficListener;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector;
+import org.quartz.CronTrigger;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.SchedulerFactory;
+
+import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthMap;
+import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthRoute;
+import com.raytheon.uf.common.datadelivery.registry.Network;
+import com.raytheon.uf.common.status.IUFStatusHandler;
+import com.raytheon.uf.common.status.UFStatus;
+import com.raytheon.uf.common.status.UFStatus.Priority;
+import com.raytheon.uf.common.time.util.TimeUtil;
+import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao;
+
+/**
+ * {@link RegistryBandwidthUtilizationListener} Keeps track of network traffic for registry
+ *
+ *
+ *
+ * SOFTWARE HISTORY
+ *
+ * Date Ticket# Engineer Description
+ * ------------ ---------- ----------- --------------------------
+ * Nov 06, 2013 1736 dhladky Initial creation
+ *
+ *
+ *
+ * @author dhladky
+ * @version 1.0
+ */
+
+public class RegistryBandwidthUtilizationListener implements NetworkTrafficListener {
+
+ private static final IUFStatusHandler statusHandler = UFStatus
+ .getHandler(RegistryBandwidthUtilizationListener.class);
+
+ /* Is this registry federated or not */
+ private boolean isFederated = false;
+
+ /* Total bytes collected since last run */
+ private AtomicInteger totalBytes = new AtomicInteger(0);
+
+ /* The millis of the last run */
+ private Long lastRun;
+
+ /** network for data traffic */
+ private Network network = Network.OPSNET;;
+
+ /** size of bucket in minutes */
+ private int bucketSize = 0;
+
+ /** bucket dao class */
+ private IBandwidthBucketDao bucketDao;
+
+ /**
+ * Construct the listener
+ * @param server
+ * @param isFederated
+ * @param BandwidthMap
+ */
+ public RegistryBandwidthUtilizationListener(Server server, Boolean isFederated, BandwidthMap map, IBandwidthBucketDao bucketDao) {
+
+ // We only care about OPSNET in this listener
+ this.setFederated(isFederated);
+ this.lastRun = TimeUtil.currentTimeMillis();
+ BandwidthRoute route = map.getRoute(network);
+ this.bucketSize = route.getBucketSizeMinutes();
+ this.bucketDao = bucketDao;
+ String cron = getCronString(route.getBucketSizeMinutes());
+ createQuartzCron(cron, network.name());
+
+ for (Connector connector: server.getConnectors()) {
+ if (connector instanceof NetworkTrafficSelectChannelConnector) {
+ NetworkTrafficSelectChannelConnector nconnector = ((NetworkTrafficSelectChannelConnector)connector);
+ nconnector.addNetworkTrafficListener(this);
+ statusHandler.debug(nconnector.toString()+ " on Network: "+network);
+ }
+ }
+ }
+
+
+ @Override
+ public void opened(Socket socket) {
+ if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
+ statusHandler.debug("Socket Open!");
+ }
+ }
+
+ @Override
+ public void incoming(Socket socket, Buffer bytes) {
+
+ // Ignore local traffic if federated
+ if (isFilteredTraffic(socket)) {
+ return;
+ }
+
+ // add bytes to total
+ addBytes(bytes);
+ }
+
+ @Override
+ public void outgoing(Socket socket, Buffer bytes) {
+
+ // Ignore local traffic if federated
+ if (isFilteredTraffic(socket)) {
+ return;
+ }
+
+ // add bytes to total
+ addBytes(bytes);
+ }
+
+ @Override
+ public void closed(Socket socket) {
+ if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
+ statusHandler.debug("Socket Closed!");
+ }
+ }
+
+ /**
+ * Add the bytes
+ * @param bytes
+ */
+ private void addBytes(Buffer bytes) {
+ totalBytes.getAndAdd(bytes.length());
+ }
+
+
+ public boolean isFederated() {
+ return isFederated;
+ }
+
+
+ public void setFederated(boolean isFederated) {
+ this.isFederated = isFederated;
+ }
+
+ /**
+ * Filter un-needed traffic
+ * @param socket
+ * @return
+ */
+ private boolean isFilteredTraffic(Socket socket) {
+
+ if (isFederated()) {
+
+ InetAddress address = socket.getInetAddress();
+
+ if (address.isLoopbackAddress()) {
+ // ignore this traffic
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Get the string to submit as the cron
+ * @param minutes
+ * @return
+ */
+ private String getCronString(int minutes) {
+ return new String("0 0/"+minutes+" * * * ?");
+ }
+
+ /**
+ * Creates a dynamic quartz cron for the Bandwidth Utilization based on the
+ * size of the Bandwidth Bucket in minutes.
+ * @param cron
+ * @param name
+ */
+ private void createQuartzCron(String cron, String name) {
+
+ try {
+
+ SchedulerFactory schedFactory = new org.quartz.impl.StdSchedulerFactory();
+ Scheduler schedular = schedFactory.getScheduler();
+ JobDetail jobDetail = null;
+
+ try {
+ jobDetail = schedular.getJobDetail(name, "BandwidthUtilization");
+ } catch (SchedulerException se) {
+ statusHandler.info("Job doesn't exist!");
+ }
+
+ if (jobDetail != null) {
+ // reschedule
+ CronTrigger trigger = (CronTrigger) schedular.getTrigger(name,
+ "BandwidthUtilization");
+ String cronEx = trigger.getCronExpression();
+ if (!cron.equals(cronEx)) {
+ trigger.setCronExpression(cron);
+ schedular.rescheduleJob(name, "BandwidthUtilization", trigger);
+ statusHandler.info("Rescheduling Job: " + name);
+ }
+ } else {
+ jobDetail = new JobDetail(name, "BandwidthUtilization", BandwidthUtilizationProcessor.class);
+ jobDetail.getJobDataMap().put(name, "FULL");
+ // add the atomic int to the map
+ jobDetail.getJobDataMap().put("totalBytes", totalBytes);
+ jobDetail.getJobDataMap().put("network", network);
+ jobDetail.getJobDataMap().put("lastRun", lastRun);
+ jobDetail.getJobDataMap().put("bucketDao", bucketDao);
+ jobDetail.getJobDataMap().put("bucketSize", bucketSize);
+ CronTrigger trigger = new CronTrigger(name, "BandwidthUtilization");
+ trigger.setCronExpression(cron);
+ schedular.scheduleJob(jobDetail, trigger);
+ statusHandler.info("Scheduling Job: " + name);
+ }
+
+ if (!schedular.isStarted()) {
+ schedular.start();
+ }
+
+ } catch (Exception e) {
+ statusHandler.error("Unable to schedule job: " + name + " error: "
+ + e);
+ }
+ }
+
+}
diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java
index 4c7befba90..456981262d 100644
--- a/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java
+++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.bandwidth/src/com/raytheon/uf/edex/datadelivery/bandwidth/retrieval/RetrievalPlan.java
@@ -11,6 +11,8 @@ import java.util.TreeSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
+import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthMap;
+import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthRoute;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.status.IUFStatusHandler;
@@ -22,6 +24,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthBucket;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthBucketDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
+import com.raytheon.uf.edex.datadelivery.bandwidth.registry.RegistryBandwidthService;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
/**
@@ -40,6 +43,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Nov 20, 2012 1286 djohnson Handle null bucketIds being returned.
* Jun 25, 2013 2106 djohnson Separate state into other classes, promote BandwidthBucket to a class proper.
* Oct 30, 2013 2448 dhladky Moved methods to TimeUtil.
+ * Nov 16, 2013 1736 dhladky Alter size of available bandwidth by subtracting that used by registry.
*
*
*
@@ -102,6 +106,7 @@ public class RetrievalPlan {
void init() {
boolean found = false;
BandwidthRoute route = map.getRoute(network);
+
if (route != null) {
found = true;
this.planDays = route.getPlanDays();
@@ -109,28 +114,41 @@ public class RetrievalPlan {
}
if (found) {
+ // create registry bandwidth service
+ RegistryBandwidthService rbs = new RegistryBandwidthService(bucketsDao, network, bucketMinutes);
+ long bucketMillis = bucketMinutes * TimeUtil.MILLIS_PER_MINUTE;
Calendar currentBucket = BandwidthUtil.now();
planStart = BandwidthUtil.now();
planEnd = TimeUtil.newCalendar(planStart);
planEnd.add(Calendar.DAY_OF_YEAR, planDays);
-
+ long currentMillis = currentBucket.getTimeInMillis();
+ long planEndMillis = planEnd.getTimeInMillis();
// Make the buckets...
- long bucket = 0;
- while (!currentBucket.after(planEnd)) {
+
+ while (!(currentMillis > planEndMillis)) {
+
int bw = map.getBandwidth(network, currentBucket);
- bucket = currentBucket.getTimeInMillis();
// Get the bucket size..
- // buckets are (bandwidth [kilobits/second] * milliseconds per
+ // buckets are (bandwidth [kilobytes/second] * milliseconds per
// minute *
// bucket minutes)/bits per byte) ...
bytesPerBucket = BandwidthUtil
.convertKilobytesPerSecondToBytesPerSpecifiedMinutes(
bw, bucketMinutes);
- bucketsDao.create(new BandwidthBucket(bucket, bytesPerBucket,
+ bucketsDao.create(new BandwidthBucket(currentMillis, bytesPerBucket,
network));
- currentBucket.add(Calendar.MINUTE, bucketMinutes);
+
+ currentMillis += bucketMillis;
}
+
+ // subtract registry traffic from total available bytes/per second
+ for (BandwidthBucket bucket: bucketsDao.getAll(network)) {
+ long startMillis = bucket.getBucketStartTime();
+ int registryBytesPerSecond = rbs.getRegistryBandwidth(startMillis);
+ bucket.setBucketSize(bucket.getBucketSize() - (registryBytesPerSecond * TimeUtil.SECONDS_PER_MINUTE * bucketMinutes));
+ }
+
} else {
// Can't proceed, throw an Exception
throw new IllegalArgumentException(
@@ -212,33 +230,41 @@ public class RetrievalPlan {
// Get the last bucket and add buckets to make up the
// difference..
// Make the buckets...
+ long bucketMillis = bucketMinutes * TimeUtil.MILLIS_PER_MINUTE;
+ long newPlanEndMillis = newEndOfPlan.getTimeInMillis();
BandwidthBucket bucket = bucketsDao.getLastBucket(network);
Calendar currentBucket = BandwidthUtil.now();
currentBucket.setTimeInMillis(bucket.getBucketStartTime());
+ long currentBucketMillis = bucket.getBucketStartTime();
// Add the buckets minutes to the last bucket and add
// buckets until we have the new plan size.
- currentBucket.add(Calendar.MINUTE, bucketMinutes);
-
- while (!currentBucket.after(newEndOfPlan)) {
+ currentBucketMillis += bucketMillis;
+ // create Registry Bandwidth Service
+ RegistryBandwidthService rbs = new RegistryBandwidthService(bucketsDao, network, bucketMinutes);
+
+ while (!(currentBucketMillis > newPlanEndMillis)) {
+
int bw = map.getBandwidth(network, currentBucket);
- long bucketStartTime = currentBucket.getTimeInMillis();
+ // subtract registry traffic from total available bytes/per second
+ int registryBytesPerSecond = rbs.getRegistryBandwidth(currentBucketMillis);
+ bw = bw - registryBytesPerSecond;
// Get the bucket size..
- // buckets are (bandwidth * kilobits/second * 60 seconds *
+ // buckets are (bandwidth * kilobytes/second * 60 seconds *
// bucket minutes)/bits per byte) ...
bytesPerBucket = BandwidthUtil
.convertKilobytesPerSecondToBytesPerSpecifiedMinutes(
bw, bucketMinutes);
- bucketsDao.create(new BandwidthBucket(bucketStartTime,
+ bucketsDao.create(new BandwidthBucket(currentBucketMillis,
bytesPerBucket, network));
- currentBucket.add(Calendar.MINUTE, bucketMinutes);
+
+ currentBucketMillis += bucketMillis;
statusHandler.info("resize() - Adding bucket [" + bucket
+ "] bandwidth = [" + bw + "]");
}
}
-
}
- // Now remove buckets from the front of the map whos time slot
+ // Now remove buckets from the front of the map who's time slot
// is past and are empty
long newStart = newStartOfPlan.getTimeInMillis();
diff --git a/edexOsgi/com.raytheon.uf.edex.datadelivery.registry/META-INF/MANIFEST.MF b/edexOsgi/com.raytheon.uf.edex.datadelivery.registry/META-INF/MANIFEST.MF
index 5853e0db16..67f14aed98 100644
--- a/edexOsgi/com.raytheon.uf.edex.datadelivery.registry/META-INF/MANIFEST.MF
+++ b/edexOsgi/com.raytheon.uf.edex.datadelivery.registry/META-INF/MANIFEST.MF
@@ -23,3 +23,7 @@ Require-Bundle: com.raytheon.uf.common.registry.schemas.ebxml;bundle-version="1.
com.raytheon.uf.common.datadelivery.registry;bundle-version="1.0.0",
org.apache.commons.lang;bundle-version="2.3.0",
com.raytheon.uf.common.registry.event;bundle-version="1.0.0"
+Export-Package: com.raytheon.uf.edex.datadelivery.registry.availability,
+ com.raytheon.uf.edex.datadelivery.registry.federation,
+ com.raytheon.uf.edex.datadelivery.registry.replication,
+ com.raytheon.uf.edex.datadelivery.registry.web
diff --git a/edexOsgi/com.raytheon.uf.edex.registry.ebxml/res/spring/ebxml-webserver.xml b/edexOsgi/com.raytheon.uf.edex.registry.ebxml/res/spring/ebxml-webserver.xml
index ac83bf6582..36c7c08d7f 100644
--- a/edexOsgi/com.raytheon.uf.edex.registry.ebxml/res/spring/ebxml-webserver.xml
+++ b/edexOsgi/com.raytheon.uf.edex.registry.ebxml/res/spring/ebxml-webserver.xml
@@ -14,7 +14,7 @@
-
+
diff --git a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthServiceIntTest.java b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthServiceIntTest.java
index a0d4415a87..57ca7c7e95 100644
--- a/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthServiceIntTest.java
+++ b/tests/integration/com/raytheon/uf/edex/datadelivery/bandwidth/BandwidthServiceIntTest.java
@@ -44,6 +44,7 @@ import com.raytheon.uf.common.datadelivery.bandwidth.IProposeScheduleResponse;
import com.raytheon.uf.common.datadelivery.bandwidth.WfoBandwidthService;
import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthBucketDescription;
import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthGraphData;
+import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthMap;
import com.raytheon.uf.common.datadelivery.bandwidth.data.SubscriptionStatusSummary;
import com.raytheon.uf.common.datadelivery.bandwidth.data.TimeWindowData;
import com.raytheon.uf.common.datadelivery.registry.AdhocSubscription;
@@ -63,7 +64,6 @@ import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthBucket;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
-import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
diff --git a/tests/unit/com/raytheon/uf/common/serialization/TestJaxbableClassesLocator.java b/tests/unit/com/raytheon/uf/common/serialization/TestJaxbableClassesLocator.java
index 8f166c7a47..4328a5cc6b 100644
--- a/tests/unit/com/raytheon/uf/common/serialization/TestJaxbableClassesLocator.java
+++ b/tests/unit/com/raytheon/uf/common/serialization/TestJaxbableClassesLocator.java
@@ -95,7 +95,7 @@ public class TestJaxbableClassesLocator implements IJaxbableClassesLocator {
com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate.class,
com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription.class,
com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation.class,
- com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap.class,
+ com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthMap.class,
com.raytheon.uf.common.datadelivery.retrieval.xml.ServiceConfig.class,
com.raytheon.uf.common.datadelivery.retrieval.xml.UnitLookup.class,
com.raytheon.uf.common.datadelivery.retrieval.xml.LevelLookup.class,
diff --git a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/util/BandwidthDaoUtilTest.java b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/util/BandwidthDaoUtilTest.java
index 86e18030a2..0b6cbe927f 100644
--- a/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/util/BandwidthDaoUtilTest.java
+++ b/tests/unit/com/raytheon/uf/edex/datadelivery/bandwidth/util/BandwidthDaoUtilTest.java
@@ -44,6 +44,7 @@ import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Maps;
+import com.raytheon.uf.common.datadelivery.bandwidth.data.BandwidthMap;
import com.raytheon.uf.common.datadelivery.registry.GriddedTime;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
@@ -61,7 +62,6 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.InMemoryBandwidthBucketDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthSubscription;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
-import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.BandwidthMap;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.InMemoryBandwidthBucketAllocationAssociator;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;