Merge "Issue #3707 Reworked gridded subscription triggereing for retrievals. Change-Id: I4abded9b531c7b7e9b9058760b97b8f5e778d176" into omaha_14.3.1
Former-commit-id:dcce72b823
[formerlydcce72b823
[formerly 57848f31cc73d81d16595e4e5b01949bcff12a24]] Former-commit-id:cfb24faf68
Former-commit-id:6593a843e4
This commit is contained in:
commit
c73551e8ac
5 changed files with 112 additions and 178 deletions
|
@ -50,6 +50,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
|
|||
* Oct 23, 2013 2484 dhladky Unique ID for subscriptions updated.
|
||||
* Nov 14, 2013 2548 mpduff Add a subscription type slot.
|
||||
* jan 23, 2013 2584 dhladky Versions.
|
||||
* Jul 28, 2014 2752 dhladky Recurring constructor for Adhocs so that Shareds can have adhocs.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -73,8 +74,9 @@ public class AdhocSubscription<T extends Time, C extends Coverage> extends
|
|||
setGroupName("Adhoc");
|
||||
}
|
||||
|
||||
public AdhocSubscription(SiteSubscription<T, C> subscription) {
|
||||
public AdhocSubscription(RecurringSubscription<T, C> subscription) {
|
||||
super(subscription);
|
||||
this.setOwner(subscription.getOwner());
|
||||
setGroupName("Adhoc");
|
||||
}
|
||||
|
||||
|
|
|
@ -65,7 +65,8 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
|||
* Sept 30, 2013 1797 dhladky Some Generics
|
||||
* Oct 23, 2013 2484 dhladky Unique ID for subscriptions updated.
|
||||
* Nov 14, 2013 2548 mpduff Add a subscription type slot.
|
||||
* Dec 08, 2013 2584 dhladky Version update
|
||||
* Dec 08, 2013 2584 dhladky Version update
|
||||
* Jul 28, 2014 2752 dhladky Recurring constructor from SiteSubscription
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -90,6 +91,14 @@ public class SiteSubscription<T extends Time, C extends Coverage> extends
|
|||
public SiteSubscription() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor used for Adhocs
|
||||
* @param sub
|
||||
*/
|
||||
public SiteSubscription(RecurringSubscription<T, C> sub) {
|
||||
super(sub);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialization constructor.
|
||||
|
|
|
@ -150,6 +150,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.RegistryIdUtil;
|
|||
* Apr 02, 2014 2810 dhladky Priority sorting of subscriptions.
|
||||
* Apr 09, 2014 3012 dhladky Range the querries for metadata checks to subscriptions.
|
||||
* Apr 22, 2014 2992 dhladky Ability to get list of all registry nodes containing data.
|
||||
* Oct 08, 2014 3707 dhladky Changed the way Gridded subscriptions are triggered.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -573,12 +574,13 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
|
|||
*/
|
||||
@Override
|
||||
public List<BandwidthAllocation> scheduleAdhoc(
|
||||
AdhocSubscription<T, C> subscription, Calendar now) {
|
||||
AdhocSubscription<T, C> subscription, Calendar baseReferenceTime) {
|
||||
|
||||
List<BandwidthSubscription> subscriptions = new ArrayList<BandwidthSubscription>();
|
||||
// Store the AdhocSubscription with a base time of now..
|
||||
subscriptions.add(bandwidthDao.newBandwidthSubscription(subscription,
|
||||
now));
|
||||
baseReferenceTime));
|
||||
|
||||
/**
|
||||
* This check allows for retrieval of data older than current for grid.
|
||||
* This is not allowed for pointdata types, they must grab current URL
|
||||
|
@ -600,8 +602,8 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
|
|||
subscriptions));
|
||||
|
||||
for (SubscriptionRetrieval retrieval : retrievals) {
|
||||
retrieval.setStartTime(now);
|
||||
Calendar endTime = TimeUtil.newCalendar(now);
|
||||
retrieval.setStartTime(baseReferenceTime);
|
||||
Calendar endTime = TimeUtil.newCalendar(baseReferenceTime);
|
||||
endTime.add(Calendar.MINUTE, retrieval.getSubscriptionLatency());
|
||||
retrieval.setEndTime(endTime);
|
||||
// Store the SubscriptionRetrieval - retrievalManager expects
|
||||
|
@ -798,9 +800,9 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
|
|||
// Create an adhoc subscription based on the new subscription,
|
||||
// and set it to retrieve the most recent cycle (or most recent
|
||||
// url if a daily product)
|
||||
if (subscription instanceof SiteSubscription && subscription.isActive()) {
|
||||
if (subscription instanceof RecurringSubscription && subscription.isActive()) {
|
||||
AdhocSubscription<T, C> adhoc = new AdhocSubscription<T, C>(
|
||||
(SiteSubscription<T, C>) subscription);
|
||||
(RecurringSubscription<T, C>) subscription);
|
||||
adhoc = bandwidthDaoUtil.setAdhocMostRecentUrlAndTime(adhoc,
|
||||
useMostRecentDataSetUpdate);
|
||||
|
||||
|
@ -834,10 +836,7 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
|
|||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
statusHandler
|
||||
.warn("Unable to create adhoc queries for shared subscriptions at this point. This functionality should be added in the future...");
|
||||
}
|
||||
}
|
||||
return unscheduled;
|
||||
}
|
||||
|
||||
|
@ -1745,27 +1744,4 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
|
|||
|
||||
return dataSetMetaDataTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a range based on the baseReferenceTime hour.
|
||||
* @param baseReferenceTime
|
||||
* @return
|
||||
*/
|
||||
public static Map<String, Date> getBaseReferenceTimeDateRange(Calendar baseReferenceTime) {
|
||||
|
||||
Map<String, Date> dates = new HashMap<String, Date>(2);
|
||||
// Set min range to baseReferenceTime hour "00" minutes, "00" seconds
|
||||
// Set max range to baseReferenceTime hour "59" minutes, "59" seconds
|
||||
Calendar min = TimeUtil.newGmtCalendar(baseReferenceTime.getTime());
|
||||
min.set(Calendar.MINUTE, 0);
|
||||
min.set(Calendar.SECOND, 0);
|
||||
Calendar max = TimeUtil.newGmtCalendar(baseReferenceTime.getTime());
|
||||
max.set(Calendar.MINUTE, 59);
|
||||
max.set(Calendar.SECOND, 59);
|
||||
|
||||
dates.put(MIN_RANGE_TIME, min.getTime());
|
||||
dates.put(MAX_RANGE_TIME, max.getTime());
|
||||
|
||||
return dates;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData;
|
|||
import com.raytheon.uf.common.datadelivery.registry.Network;
|
||||
import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaData;
|
||||
import com.raytheon.uf.common.datadelivery.registry.PointTime;
|
||||
import com.raytheon.uf.common.datadelivery.registry.SharedSubscription;
|
||||
import com.raytheon.uf.common.datadelivery.registry.SiteSubscription;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Subscription;
|
||||
import com.raytheon.uf.common.datadelivery.registry.Time;
|
||||
|
@ -59,6 +60,7 @@ import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHan
|
|||
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
|
||||
import com.raytheon.uf.common.datadelivery.service.ISubscriptionNotificationService;
|
||||
import com.raytheon.uf.common.event.EventBus;
|
||||
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
|
||||
import com.raytheon.uf.common.registry.ebxml.encoder.RegistryEncoders;
|
||||
import com.raytheon.uf.common.registry.event.InsertRegistryEvent;
|
||||
import com.raytheon.uf.common.registry.event.RegistryEvent;
|
||||
|
@ -74,7 +76,6 @@ import com.raytheon.uf.common.util.CollectionUtil;
|
|||
import com.raytheon.uf.common.util.FileUtil;
|
||||
import com.raytheon.uf.common.util.IFileModifiedWatcher;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
|
||||
|
@ -86,7 +87,6 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;
|
|||
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
|
||||
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
|
||||
import com.raytheon.uf.edex.registry.ebxml.util.RegistryIdUtil;
|
||||
|
||||
/**
|
||||
|
@ -130,6 +130,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.RegistryIdUtil;
|
|||
* Mar 31, 2014 2889 dhladky Added username for notification center tracking.
|
||||
* Apr 09, 2014 3012 dhladky Range the queries for metadata checks, adhoc firing prevention.
|
||||
* Apr 22, 2014 2992 dhladky Added IdUtil for siteList
|
||||
* Oct 08, 2014 3707 dhladky Changed the way Gridded subscriptions are triggered.
|
||||
* </pre>
|
||||
*
|
||||
* @author djohnson
|
||||
|
@ -493,16 +494,85 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
|
|||
@Subscribe
|
||||
public void updateGriddedDataSetMetaData(
|
||||
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
|
||||
|
||||
// Daily/Hourly/Monthly datasets
|
||||
if (dataSetMetaData.getCycle() == GriddedDataSetMetaData.NO_CYCLE) {
|
||||
updateDataSetMetaDataWithoutCycle((DataSetMetaData<T>) dataSetMetaData);
|
||||
|
||||
/*
|
||||
* Looking for active subscriptions to the dataset. #3707 Simplified the
|
||||
* triggering mechanism for Gridded subs. It all but guarantees the
|
||||
* retrieval of a given subscription subscribed to a given dataset. This
|
||||
* should be the core concept of the #2414 BandwidthManager re-design.
|
||||
*/
|
||||
|
||||
try {
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
List<Subscription> subscriptions = subscriptionHandler
|
||||
.getActiveByDataSetAndProvider(
|
||||
dataSetMetaData.getDataSetName(),
|
||||
dataSetMetaData.getProviderName());
|
||||
|
||||
if (subscriptions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
statusHandler.info(String.format(
|
||||
"Found [%s] subscriptions subscribed to "
|
||||
+ "this dataset, url [%s].", subscriptions.size(),
|
||||
dataSetMetaData.getUrl()));
|
||||
|
||||
// Create an adhoc for each one, and schedule it
|
||||
for (Subscription<T, C> subscription : subscriptions) {
|
||||
|
||||
// both of these are handled identically,
|
||||
// The only difference is logging.
|
||||
|
||||
if (subscription instanceof SiteSubscription) {
|
||||
if (subscription.getOfficeIDs().contains(
|
||||
RegistryIdUtil.getId())) {
|
||||
|
||||
Subscription<T, C> sub = updateSubscriptionWithDataSetMetaData(
|
||||
subscription, dataSetMetaData);
|
||||
statusHandler
|
||||
.info("Updating subscription metadata: "
|
||||
+ sub.getName()
|
||||
+ " dataSetMetadata: "
|
||||
+ sub.getDataSetName()
|
||||
+ " scheduling SITE subscription for retrieval.");
|
||||
|
||||
scheduleAdhoc(new AdhocSubscription<T, C>(
|
||||
(SiteSubscription<T, C>) sub));
|
||||
} else {
|
||||
// Fall through! doesn't belong to this site, so we
|
||||
// won't retrieve it.
|
||||
}
|
||||
|
||||
} else if (subscription instanceof SharedSubscription) {
|
||||
// check to see if this site is the NCF
|
||||
if (RegistryIdUtil.getId().equals(RegistryUtil.defaultUser)) {
|
||||
|
||||
Subscription<T, C> sub = updateSubscriptionWithDataSetMetaData(
|
||||
subscription, dataSetMetaData);
|
||||
statusHandler
|
||||
.info("Updating subscription metadata: "
|
||||
+ sub.getName()
|
||||
+ " dataSetMetadata: "
|
||||
+ sub.getDataSetName()
|
||||
+ " scheduling SHARED subscription for retrieval.");
|
||||
scheduleAdhoc(new AdhocSubscription<T, C>(
|
||||
(SharedSubscription<T, C>) sub));
|
||||
} else {
|
||||
// Fall through! doesn't belong to this site, so we
|
||||
// won't retrieve it.
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"Unexpected state: Subscription type other than Shared or Site encountered! "
|
||||
+ subscription.getName());
|
||||
}
|
||||
}
|
||||
} catch (RegistryHandlerException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Failed to lookup subscriptions.", e);
|
||||
}
|
||||
// Regular cycle containing datasets
|
||||
else {
|
||||
updateDataSetMetaDataWithCycle((DataSetMetaData<T>) dataSetMetaData);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -564,6 +634,12 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
|
|||
if (pointTimeEnd.before(earliestRetrievalDataTime)
|
||||
|| pointTimeStart.after(latestRetrievalDataTime)) {
|
||||
continue;
|
||||
} else {
|
||||
statusHandler.info("Retrieval: " + retrieval.toString()
|
||||
+ " Outside the range: MIN: "
|
||||
+ earliestRetrievalDataTime.toString() + " MAX: "
|
||||
+ latestRetrievalDataTime.toString()
|
||||
+ " \n No retrieval will be produced!");
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -606,136 +682,7 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles updates for datasets that do not contain cycles.
|
||||
*
|
||||
* @param dataSetMetaData
|
||||
* the dataset metadata
|
||||
* @throws ParseException
|
||||
* on parsing errors
|
||||
*/
|
||||
private void updateDataSetMetaDataWithoutCycle(
|
||||
DataSetMetaData<T> dataSetMetaData) throws ParseException {
|
||||
bandwidthDao.newBandwidthDataSetUpdate(dataSetMetaData);
|
||||
|
||||
// Looking for active subscriptions to the dataset.
|
||||
try {
|
||||
@SuppressWarnings("rawtypes")
|
||||
List<Subscription> subscriptions = subscriptionHandler
|
||||
.getActiveByDataSetAndProvider(
|
||||
dataSetMetaData.getDataSetName(),
|
||||
dataSetMetaData.getProviderName());
|
||||
|
||||
if (subscriptions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
statusHandler
|
||||
.info(String
|
||||
.format("Found [%s] subscriptions that will have an "
|
||||
+ "adhoc subscription generated and scheduled for url [%s].",
|
||||
subscriptions.size(),
|
||||
dataSetMetaData.getUrl()));
|
||||
|
||||
// Create an adhoc for each one, and schedule it
|
||||
for (Subscription<T, C> subscription : subscriptions) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Subscription<T, C> sub = updateSubscriptionWithDataSetMetaData(
|
||||
subscription, dataSetMetaData);
|
||||
|
||||
if (sub instanceof SiteSubscription) {
|
||||
scheduleAdhoc(new AdhocSubscription<T, C>(
|
||||
(SiteSubscription<T, C>) sub));
|
||||
} else {
|
||||
statusHandler
|
||||
.warn("Unable to create adhoc queries for shared subscriptions at this point. This functionality should be added in the future...");
|
||||
}
|
||||
}
|
||||
} catch (RegistryHandlerException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Failed to lookup subscriptions.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles updates for datasets that contain cycles.
|
||||
*
|
||||
* @param dataSetMetaData
|
||||
* the dataset metadata
|
||||
* @throws ParseException
|
||||
* on parsing errors
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void updateDataSetMetaDataWithCycle(
|
||||
DataSetMetaData<T> dataSetMetaData) throws ParseException {
|
||||
BandwidthDataSetUpdate dataset = bandwidthDao
|
||||
.newBandwidthDataSetUpdate(dataSetMetaData);
|
||||
|
||||
// Range the query for subscriptions within the baseReferenceTime hour.
|
||||
// SOME models, RAP and RTMA, come not exactly on the hour. This causes these
|
||||
// subscriptions to be missed because baseReferenceTimes are on the hour.
|
||||
Map<String, Date> timeRange = getBaseReferenceTimeDateRange(dataset.getDataSetBaseTime());
|
||||
|
||||
final SortedSet<SubscriptionRetrieval> subscriptions = bandwidthDao
|
||||
.getSubscriptionRetrievals(dataset.getProviderName(), dataset.getDataSetName(),
|
||||
RetrievalStatus.SCHEDULED, timeRange.get(MIN_RANGE_TIME), timeRange.get(MAX_RANGE_TIME));
|
||||
|
||||
if (!subscriptions.isEmpty()) {
|
||||
// Loop through the scheduled SubscriptionRetrievals and mark
|
||||
// the scheduled retrievals as ready for retrieval
|
||||
for (SubscriptionRetrieval retrieval : subscriptions) {
|
||||
|
||||
if (RetrievalStatus.SCHEDULED.equals(retrieval.getStatus())) {
|
||||
// Need to update the Subscription Object in the
|
||||
// SubscriptionRetrieval with the current DataSetMetaData
|
||||
// URL and time Object
|
||||
|
||||
SubscriptionRetrievalAttributes<T, C> attributes = bandwidthDao
|
||||
.getSubscriptionRetrievalAttributes(retrieval);
|
||||
|
||||
Subscription<T, C> sub;
|
||||
try {
|
||||
sub = updateSubscriptionWithDataSetMetaData(
|
||||
attributes.getSubscription(), dataSetMetaData);
|
||||
|
||||
// Update the SubscriptionRetrieval record with the new
|
||||
// data...
|
||||
attributes.setSubscription(sub);
|
||||
|
||||
bandwidthDao.update(attributes);
|
||||
} catch (SerializationException e) {
|
||||
statusHandler
|
||||
.handle(Priority.PROBLEM,
|
||||
"Unable to serialize the subscription for the retrieval, skipping...",
|
||||
e);
|
||||
continue;
|
||||
}
|
||||
|
||||
retrieval.setStatus(RetrievalStatus.READY);
|
||||
|
||||
bandwidthDaoUtil.update(retrieval);
|
||||
|
||||
statusHandler.info(String.format(
|
||||
"Updated retrieval [%s] for "
|
||||
+ "subscription [%s] to use "
|
||||
+ "url [%s] and "
|
||||
+ "base reference time [%s]",
|
||||
retrieval.getIdentifier(), sub.getName(),
|
||||
dataSetMetaData.getUrl(),
|
||||
BandwidthUtil.format(sub.getTime().getStart())));
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
statusHandler
|
||||
.debug("No Subscriptions scheduled for BandwidthDataSetUpdate ["
|
||||
+ dataset.getIdentifier()
|
||||
+ "] base time ["
|
||||
+ BandwidthUtil.format(dataset.getDataSetBaseTime())
|
||||
+ "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
@ -894,6 +841,5 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
|
|||
t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.Sta
|
|||
* Feb 07, 2013 1543 djohnson Use session management code.
|
||||
* Feb 13, 2013 1543 djohnson Exported interface which is now implemented.
|
||||
* Feb 22, 2013 1543 djohnson Made public as YAJSW doesn't like Spring exceptions.
|
||||
* Oct 13, 2014 3707 dhladky Shared subscription delivery requires you to create a new record.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -165,7 +166,7 @@ public class RetrievalDao extends
|
|||
public void completeRetrievalRequest(RetrievalRequestRecord rec)
|
||||
throws DataAccessLayerException {
|
||||
try {
|
||||
update(rec);
|
||||
createOrUpdate(rec);
|
||||
} catch (HibernateException e) {
|
||||
throw new DataAccessLayerException(
|
||||
"Failed to update the database while changing the status on ["
|
||||
|
|
Loading…
Add table
Reference in a new issue