Issue #2506 fixed issues with shared distributed ingest.

Amend: Updated comment in StoreRetrievedData.
       Removed separate messageing to clear BandwidthSubscriptions.
       Add fix and test case for SBN deserialization with content following the ending xml tag.
       Removed unneeded changes.
       Added XmlWMOMessage class.

Change-Id: Iadef7070a42a10661ba9316a4900a071820296fd

Former-commit-id: 80fd8a2ad4 [formerly fe15e874f7] [formerly e2fb158757 [formerly 0b667fbd94788a8a5b598ae5f5586d4b710518a4]]
Former-commit-id: e2fb158757
Former-commit-id: 28d8aa5151
This commit is contained in:
Brad Gonzales 2013-11-05 13:17:02 -06:00
parent 6228d02608
commit e802b866dd
21 changed files with 449 additions and 110 deletions

View file

@ -45,7 +45,7 @@ import com.raytheon.uf.common.datadelivery.registry.Time;
* Dec 06, 2012 1397 djohnson Add ability to get bandwidth graph data.
* Jul 11, 2013 2106 djohnson Bandwidth service now returns names of subscriptions for proposing bandwidth availability.
* Jul 18, 2013 1653 mpduff Added getSubscriptionStatusSummary.
* Oct 2, 2013 1797 dhladky Generics
* Oct 2, 2013 1797 dhladky Generics
*
* </pre>
*

View file

@ -28,6 +28,17 @@
<constructor-arg ref="retrievalDao" />
</bean>
<bean id="bandwidthInitializer"
class="com.raytheon.uf.edex.datadelivery.bandwidth.hibernate.HibernateBandwidthInitializer">
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.FindActiveSubscriptionsForRoute">
<constructor-arg ref="SubscriptionHandler" />
<constructor-arg ref="subscriptionRoutesToSchedule" />
</bean>
</constructor-arg>
</bean>
<!-- Used as the context for creating the BandwidthManager -->
<bean id="bandwidthContextFactory"
class="com.raytheon.uf.edex.datadelivery.bandwidth.EdexBandwidthContextFactory">
@ -40,18 +51,7 @@
class="com.raytheon.uf.edex.datadelivery.bandwidth.InMemoryBandwidthBucketDao" />
</constructor-arg>
<!-- The bandwidth manager initializer -->
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.hibernate.HibernateBandwidthInitializer">
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.bandwidth.util.FindActiveSubscriptionsForRoute">
<constructor-arg ref="SubscriptionHandler" />
<constructor-arg ref="subscriptionRoutesToSchedule" />
</bean>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg ref="bandwidthInitializer" />
<!-- The strategy for how to create the bandwidth manager -->
<constructor-arg ref="bandwidthManagerCreator" />
<!-- The db initializer -->

View file

@ -121,9 +121,10 @@ import com.raytheon.uf.edex.registry.ebxml.exception.EbxmlRegistryException;
* Sep 17, 2013 2383 bgonzale Reverted back to how BandwidthManager. handles
* case for no matching dataset metadata for an
* adhoc subscription.
* Sept 25, 2013 1797 dhladky separated time from gridded time
* 10/23/2013 2385 bphillip Change schedule method to scheduleAdhoc
* Oct 30, 2013 2448 dhladky Moved methods to TimeUtil.
* Sep 25, 2013 1797 dhladky separated time from gridded time
* Oct 23, 2013 2385 bphillip Change schedule method to scheduleAdhoc
* Oct 30, 2013 2448 dhladky Moved methods to TimeUtil.
* Nov 04, 2013 2506 bgonzale Added removeBandwidthSubscriptions method.
* </pre>
*
* @author dhladky
@ -647,6 +648,26 @@ public abstract class BandwidthManager<T extends Time, C extends Coverage>
return unscheduled;
}
/**
* Remove bandwidth subscriptions for the given id.
*
* @param subscriptionId
* the bandwidth subscriptions to remove
*/
protected void removeBandwidthSubscriptions(String subscriptionId) {
statusHandler
.info("Received Subscription removal notification for Subscription ["
+ subscriptionId
+ "], removing BandwidthSubscriptions.");
// Need to locate and remove all BandwidthReservations for the
// given subscription..
List<BandwidthSubscription> l = bandwidthDao
.getBandwidthSubscriptionByRegistryId(subscriptionId);
if (!l.isEmpty()) {
remove(l);
}
}
/**
* {@inheritDoc}
*/

View file

@ -97,6 +97,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthUtil;
* Oct 1 2013 1797 dhladky Time and GriddedTime separation
* Oct 10, 2013 1797 bgonzale Refactored registry Time objects.
* 10/23/2013 2385 bphillip Change schedule method to scheduleAdhoc
* Nov 04, 2013 2506 bgonzale Added removeBandwidthSubscriptions method.
*
* </pre>
*
@ -315,13 +316,7 @@ public abstract class EdexBandwidthManager<T extends Time, C extends Coverage>
statusHandler
.info("Received Subscription removal notification for Subscription ["
+ event.getId() + "]");
// Need to locate and remove all BandwidthReservations for the
// given subscription..
List<BandwidthSubscription> l = bandwidthDao
.getBandwidthSubscriptionByRegistryId(event.getId());
if (!l.isEmpty()) {
remove(l);
}
removeBandwidthSubscriptions(event.getId());
}
}
}

View file

@ -52,6 +52,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.util.BandwidthDaoUtil;
* May 15, 2013 2000 djohnson Include daos.
* Jul 10, 2013 2106 djohnson Dependency inject registry handlers.
* Oct 2, 2013 1797 dhladky Generics
* Oct 28, 2013 2506 bgonzale SBN (Shared) Scheduled at the central registry.
*
* </pre>
*
@ -110,10 +111,9 @@ public class WfoBandwidthManagerCreator<T extends Time, C extends Coverage> impl
.proposeSchedule(subscriptions);
// If the NCF bandwidth manager says they fit without
// unscheduling anything, then schedule them at the WFO level to
// track retrievals/graphing
// unscheduling anything, then schedule them at the NCF level
if (proposeResponse.getUnscheduledSubscriptions().isEmpty()) {
scheduleSubscriptions(subscriptions);
ncfBandwidthService.schedule(subscriptions);
}
return proposeResponse;
@ -126,12 +126,9 @@ public class WfoBandwidthManagerCreator<T extends Time, C extends Coverage> impl
protected Set<String> scheduleSbnSubscriptions(
List<Subscription<T, C>> subscriptions) throws SerializationException {
final Set<String> ncfResponse = ncfBandwidthService
.schedule(subscriptions);
scheduleSubscriptions(subscriptions);
return ncfResponse;
return ncfBandwidthService.schedule(subscriptions);
}
}
/**

View file

@ -6,6 +6,7 @@ import java.util.List;
import java.util.Set;
import com.raytheon.edex.site.SiteUtil;
import com.raytheon.uf.common.datadelivery.registry.SharedSubscription;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.common.registry.ebxml.RegistryUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
@ -34,6 +35,7 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
* Sep 05, 2013 2330 bgonzale On WFO registry init, only subscribe to local site subscriptions.
* Sep 06, 2013 2344 bgonzale Removed attempt to add to immutable empty set.
* Oct 16, 2013 2267 bgonzale executeAfterRegistryInit subscribes to all local. Removed is shared checks.
* Nov 04, 2013 2506 bgonzale added site field. facilitates testing.
*
* </pre>
*
@ -47,6 +49,8 @@ public class HibernateBandwidthInitializer implements BandwidthInitializer {
private final IFindSubscriptionsForScheduling findSubscriptionsStrategy;
private final String site;
private IBandwidthManager instance;
/**
@ -54,7 +58,18 @@ public class HibernateBandwidthInitializer implements BandwidthInitializer {
*/
public HibernateBandwidthInitializer(
IFindSubscriptionsForScheduling findSubscriptionsStrategy) {
this(findSubscriptionsStrategy, SiteUtil.getSite());
}
/**
* @param string
* @param strategy
*/
HibernateBandwidthInitializer(
IFindSubscriptionsForScheduling findSubscriptionsStrategy,
String site) {
this.findSubscriptionsStrategy = findSubscriptionsStrategy;
this.site = site;
}
@Override
@ -87,20 +102,24 @@ public class HibernateBandwidthInitializer implements BandwidthInitializer {
Set<Subscription> activeSubscriptions = new HashSet<Subscription>();
try {
final String localOffice = SiteUtil.getSite();
final boolean isRegistry = System.getProperty("edex.run.mode")
.equals("registry");
final boolean isCentralRegistry = System.getProperty(
"edex.run.mode").equals("centralRegistry");
// Load active subscriptions
for (Subscription sub : findSubscriptionsStrategy
.findSubscriptionsToSchedule()) {
boolean isShared = (sub instanceof SharedSubscription);
boolean isLocalOffice = sub.getOfficeIDs()
.contains(localOffice);
if (isLocalOffice) {
if ((isCentralRegistry && isShared)
|| (isRegistry && isLocalOffice && !isShared)) {
activeSubscriptions.add(sub);
statusHandler.info("Scheduling Subscription: " + sub);
} else {
statusHandler
.info("Not Scheduling Non-local Subscription: "
+ sub);
statusHandler.info("Not Scheduling Subscription: " + sub);
}
}
} catch (Exception e) {

View file

@ -30,13 +30,16 @@
<constructor-arg value="${sbn.retrieval.transfer.wmo.header.dataSourceMap}" />
</bean>
</constructor-arg>
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
<constructor-arg>
<util:constant
static-field="com.raytheon.uf.edex.datadelivery.retrieval.handlers.IRetrievalResponseCompleter.NULL" />
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalResponseCompleter" >
<constructor-arg ref="subNotifyTask" />
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
<constructor-arg ref="retrievalDao" />
</bean>
<util:list id="retrievalTaskList"

View file

@ -32,7 +32,6 @@
<constructor-arg ref="retrievalDao" />
</bean>
</constructor-arg>
<constructor-arg ref="retrievalDao" />
</bean>
<!-- Pick up SBN retrievals from the drop-off point -->
@ -54,13 +53,9 @@
</bean>
</constructor-arg>
<constructor-arg>
<bean
class="com.raytheon.uf.edex.datadelivery.retrieval.handlers.RetrievalResponseCompleter">
<constructor-arg ref="subNotifyTask" />
<constructor-arg ref="retrievalDao" />
</bean>
<util:constant
static-field="com.raytheon.uf.edex.datadelivery.retrieval.handlers.IRetrievalResponseCompleter.NULL" />
</constructor-arg>
<constructor-arg ref="retrievalDao" />
</bean>
<util:list id="retrievalTaskList"

View file

@ -29,6 +29,8 @@ import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.edex.datadelivery.retrieval.opendap.OpenDapRetrievalResponse;
import com.raytheon.uf.edex.datadelivery.retrieval.wfs.WfsRetrievalResponse;
import com.raytheon.uf.edex.wmo.message.WMOMessage;
import com.raytheon.uf.edex.wmo.message.XmlWMOMessage;
/**
* Deserializes the retrieved data in a retrievalQueue.
*
@ -42,6 +44,9 @@ import com.raytheon.uf.edex.wmo.message.WMOMessage;
* Mar 05, 2013 1647 djohnson Remove WMO header.
* Mar 19, 2013 1794 djohnson Read from a queue rather than the file system.
* Oct 04, 2013 2267 bgonzale Added WfsRetrieval to unmarshal classes.
* Nov 04, 2013 2506 bgonzale Added SbnRetrievalResponseXml to unmarshal classes.
* Trim content after last xml tag during
* marshaling from xml.
*
* </pre>
*
@ -62,6 +67,7 @@ public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder {
this.retrievalQueue = retrievalQueue;
try {
this.jaxbManager = new JAXBManager(RetrievalResponseXml.class,
SbnRetrievalResponseXml.class,
OpenDapRetrievalResponse.class, WfsRetrievalResponse.class,
Coverage.class);
} catch (JAXBException e) {
@ -80,9 +86,9 @@ public class DeserializeRetrievedDataFromIngest implements IRetrievalsFinder {
if (xml == null) {
return null;
} else {
WMOMessage message = new WMOMessage(xml, new Headers());
return (RetrievalResponseXml) jaxbManager
.unmarshalFromXml(new String(message.getMessageBody()));
WMOMessage message = new XmlWMOMessage(xml, new Headers());
return (RetrievalResponseXml) jaxbManager.unmarshalFromXml(message
.getBodyText());
}
}

View file

@ -34,7 +34,8 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
* ------------ ---------- ----------- --------------------------
* Feb 01, 2013 1543 djohnson Initial creation
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Oct 01, 2013 2267 bgonzale Removed request parameter. Return associated
* RetrievalRequestRecord.
*
* </pre>
*
@ -48,15 +49,15 @@ public interface IRetrievalPluginDataObjectsProcessor {
* Process plugin data objects that were created as a result of a data
* delivery retrieval request.
*
* @param request
* the request
* @param retrievalPluginDataObjects
* the retrieval plugin data objects
* @return the RetrievalRequestRecord associated with the processed
* retrievals
* @throws SerializationException
* on error with serialization
* @throws TranslationException
*/
void processRetrievedPluginDataObjects(RetrievalRequestRecord request,
RetrievalRequestRecord processRetrievedPluginDataObjects(
RetrievalResponseXml retrievalPluginDataObjects)
throws SerializationException, TranslationException;
}

View file

@ -42,6 +42,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 01, 2013 1543 djohnson Initial creation
* Nov 04, 2013 2506 bgonzale Added constructor.
*
* </pre>
*
@ -71,6 +72,15 @@ public class RetrievalResponseXml {
public RetrievalResponseXml() {
}
/**
* Constructor.
*/
public RetrievalResponseXml(RetrievalResponseXml other) {
this.requestRecord = other.requestRecord;
this.retrievalAttributePluginDataObjects = other.retrievalAttributePluginDataObjects;
this.success = other.success;
}
/**
* Constructor.
*

View file

@ -23,7 +23,6 @@ import com.raytheon.uf.common.datadelivery.registry.Network;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
/**
@ -43,7 +42,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
* Feb 15, 2013 1543 djohnson Using xml for retrievals now.
* Mar 05, 2013 1647 djohnson Change no retrievals found message to debug.
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Oct 01, 2013 2267 bgonzale Removed request parameter and IRetrievalDao field.
*
* </pre>
*
@ -63,18 +62,15 @@ public class RetrievalTask implements Runnable {
private final IRetrievalsFinder retrievalDataFinder;
private final IRetrievalDao retrievalDao;
public RetrievalTask(Network network,
IRetrievalsFinder retrievalDataFinder,
IRetrievalPluginDataObjectsProcessor retrievedDataProcessor,
IRetrievalResponseCompleter retrievalCompleter,
IRetrievalDao retrievalDao) {
IRetrievalResponseCompleter retrievalCompleter) {
this.network = network;
this.retrievalDataFinder = retrievalDataFinder;
this.retrievedDataProcessor = retrievedDataProcessor;
this.retrievalCompleter = retrievalCompleter;
this.retrievalDao = retrievalDao;
}
@Override
@ -99,11 +95,9 @@ public class RetrievalTask implements Runnable {
return;
}
request = retrievalDao.getById(retrievalPluginDataObject
.getRequestRecord());
success = retrievalPluginDataObject.isSuccess();
retrievedDataProcessor.processRetrievedPluginDataObjects(
request, retrievalPluginDataObject);
request = retrievedDataProcessor
.processRetrievedPluginDataObjects(retrievalPluginDataObject);
} catch (Exception e) {
statusHandler.error(
network + " retrieval processing error", e);

View file

@ -0,0 +1,106 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
/**
* Associates plugin data objects with their retrieval request. Specifically for
* SBN retrievals passed to clients from the central registry.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 28, 2013 2506 bgonzale Initial creation
*
* </pre>
*
* @author bgonzale
* @version 1.0
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.NONE)
@DynamicSerialize
public class SbnRetrievalResponseXml extends RetrievalResponseXml {
@XmlElement
@DynamicSerializeElement
private RetrievalRequestRecord retrievalRequestRecord;
/**
* Constructor.
*/
public SbnRetrievalResponseXml() {
}
/**
* Constructor.
*
* @param requestRecord
* @param retrievalAttributePluginDataObjects
*/
public SbnRetrievalResponseXml(RetrievalRequestRecordPK requestRecord,
List<RetrievalResponseWrapper> retrievalAttributePluginDataObjects,
RetrievalRequestRecord retrievalRequestRecord) {
super(requestRecord, retrievalAttributePluginDataObjects);
this.retrievalRequestRecord = retrievalRequestRecord;
}
/**
* Constructor
*
* @param request
* @param retrievalPluginDataObjects
*/
SbnRetrievalResponseXml(RetrievalRequestRecord request,
RetrievalResponseXml retrievalPluginDataObjects) {
super(retrievalPluginDataObjects);
this.retrievalRequestRecord = request;
}
/**
* @return the retrievalRequestRecord
*/
public RetrievalRequestRecord getRetrievalRequestRecord() {
return retrievalRequestRecord;
}
/**
* @param retrievalRequestRecord
* the retrievalRequestRecord to set
*/
public void setRetrieval(RetrievalRequestRecord retrievalRequestRecord) {
this.retrievalRequestRecord = retrievalRequestRecord;
}
}

View file

@ -30,6 +30,7 @@ import com.raytheon.uf.common.datadelivery.registry.Coverage;
import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.opendap.OpenDapRetrievalResponse;
import com.raytheon.uf.edex.datadelivery.retrieval.wfs.WfsRetrievalResponse;
@ -48,7 +49,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.wfs.WfsRetrievalResponse;
* Mar 05, 2013 1647 djohnson Apply WMO header.
* Mar 07, 2013 1647 djohnson Write out as hidden file, then rename.
* Aug 09, 2013 1822 bgonzale Added parameters to IWmoHeaderApplier.applyWmoHeader().
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Oct 28, 2013 2506 bgonzale Removed request parameters. Constructor inject IRetrievalDao.
*
* </pre>
*
@ -64,15 +65,19 @@ public class SerializeRetrievedDataToDirectory implements
private final IWmoHeaderApplier wmoHeaderWrapper;
private final IRetrievalDao retrievalDao;
/**
* @param directory
*/
public SerializeRetrievedDataToDirectory(File directory,
IWmoHeaderApplier wmoHeaderWrapper) {
IWmoHeaderApplier wmoHeaderWrapper, IRetrievalDao retrievalDao) {
this.targetDirectory = directory;
this.wmoHeaderWrapper = wmoHeaderWrapper;
this.retrievalDao = retrievalDao;
try {
this.jaxbManager = new JAXBManager(RetrievalResponseXml.class,
SbnRetrievalResponseXml.class,
OpenDapRetrievalResponse.class, WfsRetrievalResponse.class,
Coverage.class);
} catch (JAXBException e) {
@ -82,10 +87,12 @@ public class SerializeRetrievedDataToDirectory implements
/**
* {@inheritDoc}
*
* @return the RetrievalRequestRecord associated with the response
* processing.
*/
@Override
public void processRetrievedPluginDataObjects(
RetrievalRequestRecord request,
public RetrievalRequestRecord processRetrievedPluginDataObjects(
RetrievalResponseXml retrievalPluginDataObjects)
throws SerializationException {
retrievalPluginDataObjects.prepareForSerialization();
@ -96,8 +103,11 @@ public class SerializeRetrievedDataToDirectory implements
final File tempHiddenFile = new File(finalFile.getParentFile(), "."
+ finalFile.getName());
final RetrievalRequestRecord request = retrievalDao
.getById(retrievalPluginDataObjects.getRequestRecord());
final String xml = jaxbManager
.marshalToXml(retrievalPluginDataObjects);
.marshalToXml(new SbnRetrievalResponseXml(request,
retrievalPluginDataObjects));
final Date date = request.getInsertTime();
final String textForFile = wmoHeaderWrapper
.applyWmoHeader(request.getProvider(), request.getPlugin(),
@ -113,6 +123,7 @@ public class SerializeRetrievedDataToDirectory implements
+ tempHiddenFile.getAbsolutePath() + "] to ["
+ finalFile.getAbsolutePath() + "]");
}
return request;
} catch (Exception e) {
throw new SerializationException(e);
}

View file

@ -62,7 +62,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.util.RetrievalPersistUtil;
* Feb 15, 2013 1543 djohnson Retrieve the retrieval attributes from the database.
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Aug 06, 2013 1654 bgonzale Added AdhocDataRetrievalEvent.
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Oct 01, 2013 2267 bgonzale Removed request parameter. Return RetrievalRequestRecord.
*
* </pre>
*
@ -95,19 +95,25 @@ public class StoreRetrievedData implements IRetrievalPluginDataObjectsProcessor
* {@inheritDoc}
*/
@Override
public void processRetrievedPluginDataObjects(
RetrievalRequestRecord request,
public RetrievalRequestRecord processRetrievedPluginDataObjects(
RetrievalResponseXml retrievalPluginDataObjects)
throws SerializationException, TranslationException {
Map<String, PluginDataObject[]> pluginDataObjects = Maps.newHashMap();
final RetrievalRequestRecordPK id = retrievalPluginDataObjects
.getRequestRecord();
final RetrievalRequestRecord requestRecord = retrievalDao.getById(id);
RetrievalRequestRecord requestRecord = null;
if (retrievalPluginDataObjects instanceof SbnRetrievalResponseXml) {
requestRecord = ((SbnRetrievalResponseXml) retrievalPluginDataObjects)
.getRetrievalRequestRecord();
} else {
requestRecord = retrievalDao.getById(id);
}
if (requestRecord == null) {
statusHandler.warn("Unable to find retrieval by id [" + id
+ "]! Retrieval will not be processed...");
return;
throw new SerializationException(
"Invalid or missing retrieval found for Response id [" + id
+ " ] XML from Central Registry");
}
final List<RetrievalResponseWrapper> retrievalAttributePluginDataObjects = retrievalPluginDataObjects
@ -180,17 +186,17 @@ public class StoreRetrievedData implements IRetrievalPluginDataObjectsProcessor
EventBus.publish(event);
sendToDestinationForStorage(requestRecord, records);
sendToDestinationForStorage(records);
}
}
return requestRecord;
}
/**
* Sends the plugin data objects to their configured destination for storage
* to the database.
*/
public void sendToDestinationForStorage(
RetrievalRequestRecord requestRecord, PluginDataObject[] pdos) {
private void sendToDestinationForStorage(PluginDataObject[] pdos) {
String pluginName = pdos[0].getPluginName();
if (pluginName != null) {

View file

@ -33,6 +33,7 @@ import com.raytheon.edex.esb.Headers;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 14, 2009 jkorman Initial creation
* Nov 08, 2013 2506 bgonzale Setting messageBody is done only through setter method.
*
* </pre>
*
@ -67,9 +68,6 @@ public class WMOMessage implements Serializable {
*/
public WMOMessage(String wmoMessage, Headers headers) {
this(wmoMessage.getBytes(), headers);
if (messageBody != null) {
bodyText = new String(messageBody);
}
}
/**
@ -84,9 +82,10 @@ public class WMOMessage implements Serializable {
wmoHeader = header;
int bodyLen = wmoMessage.length - header.getMessageDataStart();
messageBody = new byte[bodyLen];
byte[] messageBodyData = new byte[bodyLen];
System.arraycopy(wmoMessage, header.getMessageDataStart(),
messageBody, 0, bodyLen);
messageBodyData, 0, bodyLen);
setMessageBody(messageBodyData);
}
}
}
@ -267,7 +266,7 @@ public class WMOMessage implements Serializable {
}
/**
* The the binary data that comprises the body of this message.
* Get the binary data that comprises the body of this message.
*
* @return The binary data.
*/
@ -276,7 +275,7 @@ public class WMOMessage implements Serializable {
}
/**
* The the binary data that comprises the body of this message.
* Set the binary data that comprises the body of this message.
*
* @return The binary data.
*/

View file

@ -0,0 +1,91 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.wmo.message;
import java.util.Arrays;
import com.raytheon.edex.esb.Headers;
/**
* WMOMessage that handles XML. Specifically, removes data trailing the ending
* XML tag.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 8, 2013 2506 bgonzale Initial creation
*
* </pre>
*
* @author bgonzale
* @version 1.0
*/
public class XmlWMOMessage extends WMOMessage {
/**
* Default Constructor.
*/
public XmlWMOMessage() {
}
/**
* @param wmoMessage
* @param headers
*/
public XmlWMOMessage(String wmoMessage, Headers headers) {
super(wmoMessage, headers);
}
/**
* @param wmoMessage
* @param headers
*/
public XmlWMOMessage(byte[] wmoMessage, Headers headers) {
super(wmoMessage, headers);
}
/**
* Set the binary data that comprises the body of this message.
*
* @return The binary data.
*/
public void setMessageBody(byte[] binaryData) {
super.setMessageBody(getMessageBodyXML(binaryData));
}
private byte[] getMessageBodyXML(byte[] messageBody) {
// assumes last '>' is a part of the last xml tag
final char LAST_XML_CHARACTER = '>';
int lastIndex = messageBody.length;
for (int i = messageBody.length - 1; i > -1; --i) {
if (messageBody[i] == LAST_XML_CHARACTER) {
lastIndex = i + 1;
break;
}
}
return Arrays.copyOf(messageBody, lastIndex);
}
}

View file

@ -23,14 +23,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Sets;
import com.raytheon.uf.common.datadelivery.registry.DataType;
import com.raytheon.uf.common.datadelivery.registry.SiteSubscriptionFixture;
import com.raytheon.uf.common.datadelivery.registry.Subscription;
import com.raytheon.uf.edex.core.props.EnvAttributePropertyInjector;
import com.raytheon.uf.edex.datadelivery.bandwidth.IBandwidthManager;
import com.raytheon.uf.edex.datadelivery.bandwidth.dao.IBandwidthDbInit;
import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
@ -48,7 +46,9 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
* Apr 18, 2013 1914 djohnson Fix broken test.
* Jun 25, 2013 2106 djohnson init() now takes a {@link RetrievalManager}.
* Sep 06, 2013 2344 bgonzale Added property injection of valid test value.
* Oct 21, 2013 2292 mpduff Implement multiple data types.
* Oct 21, 2013 2292 mpduff Implement multiple data types.
* Nov 04, 2013 2506 bgonzale Added site parameter to HibernateBandwidthInitializer
* constructor.
*
* </pre>
*
@ -57,13 +57,6 @@ import com.raytheon.uf.edex.datadelivery.bandwidth.retrieval.RetrievalManager;
*/
public class HibernateBandwidthInitializerTest {
@Before
public void setup() {
System.setProperty("edex.home", "../edexOsgi/build.edex/esb/");
EnvAttributePropertyInjector.injectAttributeProperty("SITENAME",
"sitename", "OAX");
}
@Test
public void testSchedulesAllSubscriptionReturnedFromIFindSubscriptions()
throws Exception {
@ -79,7 +72,7 @@ public class HibernateBandwidthInitializerTest {
IBandwidthDbInit dbInit = mock(IBandwidthDbInit.class);
final HibernateBandwidthInitializer initializer = new HibernateBandwidthInitializer(
strategy);
strategy, "OAX");
initializer
.init(bandwidthManager, dbInit, mock(RetrievalManager.class));
initializer.executeAfterRegistryInit();

View file

@ -24,22 +24,29 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.common.util.TestUtil;
import com.raytheon.uf.common.util.file.FilenameFilters;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
/**
* Test {@link DeserializeRetrievedDataFromIngest}.
@ -57,6 +64,9 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
* Mar 19, 2013 1794 djohnson Read from a queue rather than the file system.
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Nov 04, 2013 2506 bgonzale Fixed IRetreivalDao mock initialization.
* Test deserialization of data with leading and trailing
* content on the xml.
*
* </pre>
*
@ -72,11 +82,21 @@ public class DeserializeRetrievedDataFromIngestTest {
private final DeserializeRetrievedDataFromIngest service = new DeserializeRetrievedDataFromIngest(
retrievalQueue);
private final IRetrievalDao mockDao = mock(IRetrievalDao.class);
@BeforeClass
public static void classSetUp() {
PathManagerFactoryTest.initLocalization();
}
@Before
public void setup() {
when(mockDao.getById((RetrievalRequestRecordPK) Matchers.anyObject()))
.thenReturn(
RetrievalRequestRecordFixture.INSTANCE.getInstance(0,
new Random(0)));
}
@Test
public void deserializesRetrievedDataFromTheQueue()
throws Exception {
@ -110,6 +130,19 @@ public class DeserializeRetrievedDataFromIngestTest {
assertNull(restored);
}
@Test
public void attemptIngestWhenDataHasLeadingAndTrailingContent()
throws Exception {
addSimulatedSBNRetrievalToQueue();
final RetrievalResponseXml restored = service.findRetrievals();
// check for the payload
assertThat(restored.getRetrievalAttributePluginDataObjects().get(0)
.getRetrievalResponse(), is(notNullValue()));
}
private void addRetrievalToQueue() throws SerializationException,
IOException {
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
@ -120,12 +153,51 @@ public class DeserializeRetrievedDataFromIngestTest {
request.setInsertTime(new Date());
new SerializeRetrievedDataToDirectory(directory,
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"))
.processRetrievedPluginDataObjects(request,
retrievalPluginDataObjects);
new AlwaysSameWmoHeader("SMYG10 LYBM 280000"), mockDao)
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
final List<File> files = FileUtil.listFiles(directory,
FilenameFilters.ACCEPT_FILES, false);
retrievalQueue.add(FileUtil.file2String(files.get(0)));
}
private void addSimulatedSBNRetrievalToQueue()
throws SerializationException,
IOException {
RetrievalResponseXml retrievalPluginDataObjects = RetrievalPluginDataObjectsFixture.INSTANCE
.get();
RetrievalRequestRecord request = new RetrievalRequestRecord();
request.setProvider("");
request.setPlugin("");
request.setInsertTime(new Date());
new SerializeRetrievedDataToDirectory(directory,
new WmoHeaderWithLeadingAndTrailingContent("SMYG10 LYBM 280000"),
mockDao)
.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
final List<File> files = FileUtil.listFiles(directory,
FilenameFilters.ACCEPT_FILES, false);
retrievalQueue.add(FileUtil.file2String(files.get(0)));
}
private static class WmoHeaderWithLeadingAndTrailingContent extends
AlwaysSameWmoHeader {
public WmoHeaderWithLeadingAndTrailingContent(String wmoHeader) {
super(wmoHeader);
}
@Override
public String applyWmoHeader(String dataProvider, String dataFormat,
String sourceType, Date date, String data) {
String SBN_noise_prefix = "001\r\r\n747\r\r\n";
String SBN_noise_suffix = "\n\r\r\n 003";
String output = super.applyWmoHeader(dataProvider, dataFormat,
sourceType,
date, data);
return SBN_noise_prefix + output + SBN_noise_suffix;
}
}
}

View file

@ -88,6 +88,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.interfaces.IRetrievalResponse
* Apr 29, 2013 1910 djohnson Unregister from EventBus after each test.
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Nov 04, 2013 2506 bgonzale removed IRetrievalDao parameter.
*
* </pre>
*
@ -108,10 +109,11 @@ public class RetrievalTaskTest {
/**
* {@inheritDoc}
*
* @return RetrievalRequestRecord
*/
@Override
public void processRetrievedPluginDataObjects(
RetrievalRequestRecord request,
public RetrievalRequestRecord processRetrievedPluginDataObjects(
RetrievalResponseXml retrievalPluginDataObjects)
throws SerializationException, TranslationException {
final List<RetrievalResponseWrapper> retrievalAttributePluginDataObjects = retrievalPluginDataObjects
@ -143,6 +145,7 @@ public class RetrievalTaskTest {
pluginDataObjects.addAll(Arrays.asList(pdos));
}
}
return requestRecord;
}
}
@ -242,15 +245,16 @@ public class RetrievalTaskTest {
final File testDirectory = TestUtil
.setupTestClassDir(RetrievalTaskTest.class);
IRetrievalPluginDataObjectsProcessor serializeToDirectory = new SerializeRetrievedDataToDirectory(
testDirectory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"));
testDirectory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"),
dao);
RetrievalTask downloadTask = new RetrievalTask(Network.OPSNET,
retrievalDataFinder, serializeToDirectory,
mock(IRetrievalResponseCompleter.class), dao);
mock(IRetrievalResponseCompleter.class));
RetrievalTask readDownloadsTask = new RetrievalTask(Network.OPSNET,
new DeserializeRetrievedDataFromIngest(retrievalQueue),
retrievedDataProcessor, new RetrievalResponseCompleter(
mock(SubscriptionNotifyTask.class), dao), dao);
mock(SubscriptionNotifyTask.class), dao));
downloadTask.run();
@ -298,7 +302,7 @@ public class RetrievalTaskTest {
mock(SubscriptionNotifyTask.class), dao);
new RetrievalTask(Network.OPSNET, retrievalDataFinder,
retrievedDataProcessor, retrievalCompleter, dao).run();
retrievedDataProcessor, retrievalCompleter).run();
}
/**

View file

@ -21,17 +21,24 @@ package com.raytheon.uf.edex.datadelivery.retrieval.handlers;
import static com.raytheon.uf.common.util.Matchers.hasNumberOfFiles;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.Date;
import java.util.Random;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import com.raytheon.uf.common.localization.PathManagerFactoryTest;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.util.TestUtil;
import com.raytheon.uf.edex.datadelivery.retrieval.db.IRetrievalDao;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecordPK;
/**
* Test {@link SerializeRetrievedDataToDirectory}.
@ -48,6 +55,7 @@ import com.raytheon.uf.edex.datadelivery.retrieval.db.RetrievalRequestRecord;
* Aug 09, 2013 1822 bgonzale Added parameters to processRetrievedPluginDataObjects.
* Oct 01, 2013 2267 bgonzale Pass request parameter instead of components of request.
* Add test for wfs retrieval.
* Nov 04, 2013 2506 bgonzale removed IRetrievalDao and request parameters.
*
* </pre>
*
@ -59,14 +67,24 @@ public class SerializeRetrievedDataToDirectoryTest {
private final File directory = TestUtil
.setupTestClassDir(SerializeRetrievedDataToDirectoryTest.class);
private final IRetrievalDao mockDao = mock(IRetrievalDao.class);
private final SerializeRetrievedDataToDirectory service = new SerializeRetrievedDataToDirectory(
directory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"));
directory, new AlwaysSameWmoHeader("SMYG10 LYBM 280000"), mockDao);
@BeforeClass
public static void classSetUp() {
PathManagerFactoryTest.initLocalization();
}
@Before
public void setup() {
when(mockDao.getById((RetrievalRequestRecordPK) Matchers.anyObject()))
.thenReturn(
RetrievalRequestRecordFixture.INSTANCE.getInstance(0,
new Random(0)));
}
@Test
public void serializesRetrievedDataToAFileInTheTargetDirectory()
throws SerializationException {
@ -79,8 +97,7 @@ public class SerializeRetrievedDataToDirectoryTest {
// "Model"
request.setInsertTime(new Date());
service.processRetrievedPluginDataObjects(request,
retrievalPluginDataObjects);
service.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
assertThat(directory, hasNumberOfFiles(1));
}
@ -97,8 +114,7 @@ public class SerializeRetrievedDataToDirectoryTest {
// "Model"
request.setInsertTime(new Date());
service.processRetrievedPluginDataObjects(request,
retrievalPluginDataObjects);
service.processRetrievedPluginDataObjects(retrievalPluginDataObjects);
assertThat(directory, hasNumberOfFiles(1));
}