Issue #3707 Reworked gridded subscription triggereing for retrievals.

Change-Id: I4abded9b531c7b7e9b9058760b97b8f5e778d176

Former-commit-id: a3268ae60d [formerly a3268ae60d [formerly eb64bd232cd25a71dbac655f4d59f9ec10b841b4]]
Former-commit-id: c7df290186
Former-commit-id: 54e397a561
This commit is contained in:
Dave Hladky 2014-10-17 11:29:21 -05:00
parent 41729296aa
commit 04aa204dc1
5 changed files with 112 additions and 178 deletions

View file

@ -50,6 +50,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
* Oct 23, 2013 2484 dhladky Unique ID for subscriptions updated.
* Nov 14, 2013 2548 mpduff Add a subscription type slot.
* jan 23, 2013 2584 dhladky Versions.
* Jul 28, 2014 2752 dhladky Recurring constructor for Adhocs so that Shareds can have adhocs.
*
* </pre>
*
@ -73,8 +74,9 @@ public class AdhocSubscription<T extends Time, C extends Coverage> extends
setGroupName("Adhoc");
}
public AdhocSubscription(SiteSubscription<T, C> subscription) {
public AdhocSubscription(RecurringSubscription<T, C> subscription) {
super(subscription);
this.setOwner(subscription.getOwner());
setGroupName("Adhoc");
}

View file

@ -66,6 +66,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* Oct 23, 2013 2484 dhladky Unique ID for subscriptions updated.
* Nov 14, 2013 2548 mpduff Add a subscription type slot.
* Dec 08, 2013 2584 dhladky Version update
* Jul 28, 2014 2752 dhladky Recurring constructor from SiteSubscription
*
* </pre>
*
@ -91,6 +92,14 @@ public class SiteSubscription<T extends Time, C extends Coverage> extends
}
/**
* Constructor used for Adhocs
* @param sub
*/
public SiteSubscription(RecurringSubscription<T, C> sub) {
super(sub);
}
/**
* Initialization constructor.
*

View file

@ -150,6 +150,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.RegistryIdUtil;
* Apr 02, 2014 2810 dhladky Priority sorting of subscriptions.
* Apr 09, 2014 3012 dhladky Range the querries for metadata checks to subscriptions.
* Apr 22, 2014 2992 dhladky Ability to get list of all registry nodes containing data.
* Oct 08, 2014 3707 dhladky Changed the way Gridded subscriptions are triggered.
*
* </pre>
*
@ -573,12 +574,13 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
*/
@Override
public List<BandwidthAllocation> scheduleAdhoc(
AdhocSubscription<T, C> subscription, Calendar now) {
AdhocSubscription<T, C> subscription, Calendar baseReferenceTime) {
List<BandwidthSubscription> subscriptions = new ArrayList<BandwidthSubscription>();
// Store the AdhocSubscription with a base time of now..
subscriptions.add(bandwidthDao.newBandwidthSubscription(subscription,
now));
baseReferenceTime));
/**
* This check allows for retrieval of data older than current for grid.
* This is not allowed for pointdata types, they must grab current URL
@ -600,8 +602,8 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
subscriptions));
for (SubscriptionRetrieval retrieval : retrievals) {
retrieval.setStartTime(now);
Calendar endTime = TimeUtil.newCalendar(now);
retrieval.setStartTime(baseReferenceTime);
Calendar endTime = TimeUtil.newCalendar(baseReferenceTime);
endTime.add(Calendar.MINUTE, retrieval.getSubscriptionLatency());
retrieval.setEndTime(endTime);
// Store the SubscriptionRetrieval - retrievalManager expects
@ -798,9 +800,9 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
// Create an adhoc subscription based on the new subscription,
// and set it to retrieve the most recent cycle (or most recent
// url if a daily product)
if (subscription instanceof SiteSubscription && subscription.isActive()) {
if (subscription instanceof RecurringSubscription && subscription.isActive()) {
AdhocSubscription<T, C> adhoc = new AdhocSubscription<T, C>(
(SiteSubscription<T, C>) subscription);
(RecurringSubscription<T, C>) subscription);
adhoc = bandwidthDaoUtil.setAdhocMostRecentUrlAndTime(adhoc,
useMostRecentDataSetUpdate);
@ -834,9 +836,6 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
}
}
}
} else {
statusHandler
.warn("Unable to create adhoc queries for shared subscriptions at this point. This functionality should be added in the future...");
}
return unscheduled;
}
@ -1745,27 +1744,4 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
return dataSetMetaDataTime;
}
/**
* Sets a range based on the baseReferenceTime hour.
* @param baseReferenceTime
* @return
*/
public static Map<String, Date> getBaseReferenceTimeDateRange(Calendar baseReferenceTime) {
Map<String, Date> dates = new HashMap<String, Date>(2);
// Set min range to baseReferenceTime hour "00" minutes, "00" seconds
// Set max range to baseReferenceTime hour "59" minutes, "59" seconds
Calendar min = TimeUtil.newGmtCalendar(baseReferenceTime.getTime());
min.set(Calendar.MINUTE, 0);
min.set(Calendar.SECOND, 0);
Calendar max = TimeUtil.newGmtCalendar(baseReferenceTime.getTime());
max.set(Calendar.MINUTE, 59);
max.set(Calendar.SECOND, 59);
dates.put(MIN_RANGE_TIME, min.getTime());
dates.put(MAX_RANGE_TIME, max.getTime());
return dates;
}
}

View file

@ -51,6 +51,7 @@ import com.raytheon.uf.common.datadelivery.registry.GriddedDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.datadelivery.registry.PointDataSetMetaData;
import com.raytheon.uf.common.datadelivery.registry.PointTime;
import com.raytheon.uf.common.datadelivery.registry.SharedSubscription;
import com.raytheon.uf.common.datadelivery.registry.SiteSubscription;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.datadelivery.registry.Time;
@ -59,6 +60,7 @@ import com.raytheon.uf.common.datadelivery.registry.handlers.IDataSetMetaDataHan
import com.raytheon.uf.common.datadelivery.registry.handlers.ISubscriptionHandler;
import com.raytheon.uf.common.datadelivery.service.ISubscriptionNotificationService;
import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
import com.raytheon.uf.common.registry.ebxml.encoder.RegistryEncoders;
import com.raytheon.uf.common.registry.event.InsertRegistryEvent;
import com.raytheon.uf.common.registry.event.RegistryEvent;
@ -74,7 +76,6 @@ import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.common.util.IFileModifiedWatcher;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthAllocation;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.BandwidthDataSetUpdate;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDao;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.SubscriptionRetrieval;
@ -86,7 +87,6 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalPlan;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalStatus;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.SubscriptionRetrievalFulfilled;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
import com.raytheon.uf.edex.registry.ebxml.util.RegistryIdUtil;
/**
@ -130,6 +130,7 @@ import com.raytheon.uf.edex.registry.ebxml.util.RegistryIdUtil;
* Mar 31, 2014 2889 dhladky Added username for notification center tracking.
* Apr 09, 2014 3012 dhladky Range the queries for metadata checks, adhoc firing prevention.
* Apr 22, 2014 2992 dhladky Added IdUtil for siteList
* Oct 08, 2014 3707 dhladky Changed the way Gridded subscriptions are triggered.
* </pre>
*
* @author djohnson
@ -494,15 +495,84 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
public void updateGriddedDataSetMetaData(
GriddedDataSetMetaData dataSetMetaData) throws ParseException {
// Daily/Hourly/Monthly datasets
if (dataSetMetaData.getCycle() == GriddedDataSetMetaData.NO_CYCLE) {
updateDataSetMetaDataWithoutCycle((DataSetMetaData<T>) dataSetMetaData);
}
// Regular cycle containing datasets
else {
updateDataSetMetaDataWithCycle((DataSetMetaData<T>) dataSetMetaData);
/*
* Looking for active subscriptions to the dataset. #3707 Simplified the
* triggering mechanism for Gridded subs. It all but guarantees the
* retrieval of a given subscription subscribed to a given dataset. This
* should be the core concept of the #2414 BandwidthManager re-design.
*/
try {
@SuppressWarnings("rawtypes")
List<Subscription> subscriptions = subscriptionHandler
.getActiveByDataSetAndProvider(
dataSetMetaData.getDataSetName(),
dataSetMetaData.getProviderName());
if (subscriptions.isEmpty()) {
return;
}
statusHandler.info(String.format(
"Found [%s] subscriptions subscribed to "
+ "this dataset, url [%s].", subscriptions.size(),
dataSetMetaData.getUrl()));
// Create an adhoc for each one, and schedule it
for (Subscription<T, C> subscription : subscriptions) {
// both of these are handled identically,
// The only difference is logging.
if (subscription instanceof SiteSubscription) {
if (subscription.getOfficeIDs().contains(
RegistryIdUtil.getId())) {
Subscription<T, C> sub = updateSubscriptionWithDataSetMetaData(
subscription, dataSetMetaData);
statusHandler
.info("Updating subscription metadata: "
+ sub.getName()
+ " dataSetMetadata: "
+ sub.getDataSetName()
+ " scheduling SITE subscription for retrieval.");
scheduleAdhoc(new AdhocSubscription<T, C>(
(SiteSubscription<T, C>) sub));
} else {
// Fall through! doesn't belong to this site, so we
// won't retrieve it.
}
} else if (subscription instanceof SharedSubscription) {
// check to see if this site is the NCF
if (RegistryIdUtil.getId().equals(RegistryUtil.defaultUser)) {
Subscription<T, C> sub = updateSubscriptionWithDataSetMetaData(
subscription, dataSetMetaData);
statusHandler
.info("Updating subscription metadata: "
+ sub.getName()
+ " dataSetMetadata: "
+ sub.getDataSetName()
+ " scheduling SHARED subscription for retrieval.");
scheduleAdhoc(new AdhocSubscription<T, C>(
(SharedSubscription<T, C>) sub));
} else {
// Fall through! doesn't belong to this site, so we
// won't retrieve it.
}
} else {
throw new IllegalArgumentException(
"Unexpected state: Subscription type other than Shared or Site encountered! "
+ subscription.getName());
}
}
} catch (RegistryHandlerException e) {
statusHandler.handle(Priority.PROBLEM,
"Failed to lookup subscriptions.", e);
}
}
/**
@ -564,6 +634,12 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
if (pointTimeEnd.before(earliestRetrievalDataTime)
|| pointTimeStart.after(latestRetrievalDataTime)) {
continue;
} else {
statusHandler.info("Retrieval: " + retrieval.toString()
+ " Outside the range: MIN: "
+ earliestRetrievalDataTime.toString() + " MAX: "
+ latestRetrievalDataTime.toString()
+ " \n No retrieval will be produced!");
}
try {
@ -606,135 +682,6 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
}
}
/**
* Handles updates for datasets that do not contain cycles.
*
* @param dataSetMetaData
* the dataset metadata
* @throws ParseException
* on parsing errors
*/
private void updateDataSetMetaDataWithoutCycle(
DataSetMetaData<T> dataSetMetaData) throws ParseException {
bandwidthDao.newBandwidthDataSetUpdate(dataSetMetaData);
// Looking for active subscriptions to the dataset.
try {
@SuppressWarnings("rawtypes")
List<Subscription> subscriptions = subscriptionHandler
.getActiveByDataSetAndProvider(
dataSetMetaData.getDataSetName(),
dataSetMetaData.getProviderName());
if (subscriptions.isEmpty()) {
return;
}
statusHandler
.info(String
.format("Found [%s] subscriptions that will have an "
+ "adhoc subscription generated and scheduled for url [%s].",
subscriptions.size(),
dataSetMetaData.getUrl()));
// Create an adhoc for each one, and schedule it
for (Subscription<T, C> subscription : subscriptions) {
@SuppressWarnings("unchecked")
Subscription<T, C> sub = updateSubscriptionWithDataSetMetaData(
subscription, dataSetMetaData);
if (sub instanceof SiteSubscription) {
scheduleAdhoc(new AdhocSubscription<T, C>(
(SiteSubscription<T, C>) sub));
} else {
statusHandler
.warn("Unable to create adhoc queries for shared subscriptions at this point. This functionality should be added in the future...");
}
}
} catch (RegistryHandlerException e) {
statusHandler.handle(Priority.PROBLEM,
"Failed to lookup subscriptions.", e);
}
}
/**
* Handles updates for datasets that contain cycles.
*
* @param dataSetMetaData
* the dataset metadata
* @throws ParseException
* on parsing errors
*/
@SuppressWarnings("unchecked")
private void updateDataSetMetaDataWithCycle(
DataSetMetaData<T> dataSetMetaData) throws ParseException {
BandwidthDataSetUpdate dataset = bandwidthDao
.newBandwidthDataSetUpdate(dataSetMetaData);
// Range the query for subscriptions within the baseReferenceTime hour.
// SOME models, RAP and RTMA, come not exactly on the hour. This causes these
// subscriptions to be missed because baseReferenceTimes are on the hour.
Map<String, Date> timeRange = getBaseReferenceTimeDateRange(dataset.getDataSetBaseTime());
final SortedSet<SubscriptionRetrieval> subscriptions = bandwidthDao
.getSubscriptionRetrievals(dataset.getProviderName(), dataset.getDataSetName(),
RetrievalStatus.SCHEDULED, timeRange.get(MIN_RANGE_TIME), timeRange.get(MAX_RANGE_TIME));
if (!subscriptions.isEmpty()) {
// Loop through the scheduled SubscriptionRetrievals and mark
// the scheduled retrievals as ready for retrieval
for (SubscriptionRetrieval retrieval : subscriptions) {
if (RetrievalStatus.SCHEDULED.equals(retrieval.getStatus())) {
// Need to update the Subscription Object in the
// SubscriptionRetrieval with the current DataSetMetaData
// URL and time Object
SubscriptionRetrievalAttributes<T, C> attributes = bandwidthDao
.getSubscriptionRetrievalAttributes(retrieval);
Subscription<T, C> sub;
try {
sub = updateSubscriptionWithDataSetMetaData(
attributes.getSubscription(), dataSetMetaData);
// Update the SubscriptionRetrieval record with the new
// data...
attributes.setSubscription(sub);
bandwidthDao.update(attributes);
} catch (SerializationException e) {
statusHandler
.handle(Priority.PROBLEM,
"Unable to serialize the subscription for the retrieval, skipping...",
e);
continue;
}
retrieval.setStatus(RetrievalStatus.READY);
bandwidthDaoUtil.update(retrieval);
statusHandler.info(String.format(
"Updated retrieval [%s] for "
+ "subscription [%s] to use "
+ "url [%s] and "
+ "base reference time [%s]",
retrieval.getIdentifier(), sub.getName(),
dataSetMetaData.getUrl(),
BandwidthUtil.format(sub.getTime().getStart())));
}
}
} else {
statusHandler
.debug("No Subscriptions scheduled for BandwidthDataSetUpdate ["
+ dataset.getIdentifier()
+ "] base time ["
+ BandwidthUtil.format(dataset.getDataSetBaseTime())
+ "]");
}
}
/**
* {@inheritDoc}
@ -894,6 +841,5 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
t);
}
}
}
}

View file

@ -35,6 +35,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord.Sta
* Feb 07, 2013 1543 djohnson Use session management code.
* Feb 13, 2013 1543 djohnson Exported interface which is now implemented.
* Feb 22, 2013 1543 djohnson Made public as YAJSW doesn't like Spring exceptions.
* Oct 13, 2014 3707 dhladky Shared subscription delivery requires you to create a new record.
*
* </pre>
*
@ -165,7 +166,7 @@ public class RetrievalDao extends
public void completeRetrievalRequest(RetrievalRequestRecord rec)
throws DataAccessLayerException {
try {
update(rec);
createOrUpdate(rec);
} catch (HibernateException e) {
throw new DataAccessLayerException(
"Failed to update the database while changing the status on ["