Issue #1912 increase bulk requests to pypies.

Former-commit-id: 0e59516135 [formerly 2b11b35462 [formerly 50da745da0b9e963c3505512b86b3c59101f8ed7]]
Former-commit-id: 2b11b35462
Former-commit-id: 82d2c832b1
This commit is contained in:
Ben Steffensmeier 2013-04-16 16:38:52 -05:00 committed by Gerrit Code Review
parent 5c67134d7b
commit 15a88488ca
4 changed files with 195 additions and 207 deletions

View file

@ -7,6 +7,7 @@ import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -375,6 +376,11 @@ public class FFMPMonitor extends ResourceMonitor {
"FFMP Can't retrieve FFMP URI, " + uri, e);
}
SourceXML sourceXml = fscm.getSource(source);
if (sourceXml.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
source = sourceXml.getDisplayName();
}
return siteDataMap.get(siteKey).getSourceData(source).getRecord();
}
@ -390,7 +396,8 @@ public class FFMPMonitor extends ResourceMonitor {
NavigableMap<Date, List<String>> uris, String siteKey, String source) {
// get record from cache
FFMPSourceData sourceData = siteDataMap.get(siteKey).getSourceData(source);
FFMPSourceData sourceData = siteDataMap.get(siteKey).getSourceData(
source);
FFMPRecord curRecord = sourceData.getRecord();
if (curRecord == null) {
@ -398,14 +405,14 @@ public class FFMPMonitor extends ResourceMonitor {
for (String huc : data.getBasinsMap().keySet()) {
// add all of the uris
for (Entry<Date, List<String>> duris : uris.entrySet()) {
for (String uri : duris.getValue()) {
if (curRecord == null) {
curRecord = new FFMPRecord(uri);
sourceData.setRecord(curRecord);
if (data.getTimes().contains(duris.getKey().getTime())) {
for (String uri : duris.getValue()) {
if (curRecord == null) {
curRecord = new FFMPRecord(uri);
sourceData.setRecord(curRecord);
}
sourceData.addLoadedUri(huc, uri);
}
sourceData.addLoadedUri(huc, uri);
}
}
}
@ -1672,16 +1679,18 @@ public class FFMPMonitor extends ResourceMonitor {
*/
public void processUri(boolean isProductLoad, String uri, String siteKey,
String sourceName, Date barrierTime, String phuc) {
SourceXML source = getSourceConfig().getSource(sourceName);
if (uri != null) {
try {
FFMPRecord record = populateFFMPRecord(isProductLoad, uri,
siteKey, sourceName, phuc);
if ((record != null) && (source != null)) {
record.setExpiration(source.getExpirationMinutes(siteKey));
record.setRate(source.isRate());
if (record != null) {
record.getBasinData(phuc).loadNow();
SourceXML source = getSourceConfig().getSource(sourceName);
if (source != null) {
record.setExpiration(source
.getExpirationMinutes(siteKey));
record.setRate(source.isRate());
}
}
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
@ -2280,46 +2289,48 @@ public class FFMPMonitor extends ResourceMonitor {
}
public void run() {
SourceXML source = getSourceConfig().getSource(fsourceName);
if (furiMap != null) {
SourceXML source = getSourceConfig().getSource(fsourceName);
boolean isGuidance = false;
if (source != null
&& source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
isGuidance = true;
}
List<String> loadedUris = getLoadedUris(fsiteKey, fsourceName,
fhuc);
Set<FFMPRecord> populatedRecords = new HashSet<FFMPRecord>();
for (List<String> uris : furiMap.descendingMap().values()) {
for (String uri : uris) {
if (uri != null) {
FFMPRecord record = new FFMPRecord(uri);
if (record.getDataTime().getRefTime()
.after(fbarrierTime)
|| source
.getSourceType()
.equals(FFMPSourceConfigurationManager.SOURCE_TYPE.GUIDANCE
.getSourceType())) {
try {
if (!getLoadedUris(fsiteKey, fsourceName,
fhuc).contains(uri)) {
record = populateFFMPRecord(
fisProductLoad, uri, fsiteKey,
fsourceName, fhuc);
if ((record != null)
&& (source != null)) {
record.setExpiration(source
.getExpirationMinutes(fsiteKey));
record.setRate(source.isRate());
}
if (uri == null || loadedUris.contains(uri)) {
continue;
}
FFMPRecord record = new FFMPRecord(uri);
if (record.getDataTime().getRefTime()
.after(fbarrierTime)
|| isGuidance) {
try {
record = populateFFMPRecord(fisProductLoad,
uri, fsiteKey, fsourceName, fhuc);
if (record != null) {
populatedRecords.add(record);
if (source != null) {
record.setExpiration(source
.getExpirationMinutes(fsiteKey));
record.setRate(source.isRate());
}
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"FFMP Can't retrieve FFMP URI, "
+ uri, e);
}
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"FFMP Can't retrieve FFMP URI, " + uri,
e);
}
}
}
}
for (FFMPRecord record : populatedRecords) {
record.getBasinData(fhuc).loadNow();
}
}
}
}

View file

@ -67,6 +67,8 @@ import com.raytheon.uf.viz.monitor.ffmp.ui.listeners.FFMPLoaderEvent;
* Feb 28, 2013 1729 dhladky Changed the way status messages are sent to the FFMP Dialog.
* Mar 6, 2013 1769 dhladky Changed threading to use count down latch.
* Apr 9, 2013 1890 dhladky removed loading of phantom Virtual template and cache file processing.
* Apr 18, 2013 1912 bsteffen Increase bulk requests to pypies.
*
* </pre>
*
* @author dhladky
@ -172,8 +174,8 @@ public class FFMPDataLoader extends Thread {
boolean isProductLoad = true;
String rateURI = null;
if ((loadType == LOADER_TYPE.INITIAL)
|| (loadType == LOADER_TYPE.GENERAL)) {
if ((loadType == LOADER_TYPE.INITIAL || loadType == LOADER_TYPE.GENERAL)
&& !product.getRate().equals(product.getQpe())) {
rateURI = monitor.getAvailableUri(siteKey, dataKey,
product.getRate(), mostRecentTime);
}
@ -267,7 +269,7 @@ public class FFMPDataLoader extends Thread {
SourceXML source = sourceConfig.getSource(product.getQpe());
qpeCache = readAggregateRecord(source, dataKey, wfo);
qpeCache = readAggregateRecord(source, dataKey, wfo);
if (qpeCache != null) {
monitor.insertFFMPData(qpeCache, qpeURIs, siteKey, product.getQpe());
@ -276,7 +278,7 @@ public class FFMPDataLoader extends Thread {
// Use this method of QPE data retrieval if you don't have cache
// files
if (!qpeURIs.isEmpty() && qpeCache == null) {
if (!qpeURIs.isEmpty()) {
for (String phuc : hucsToLoad) {
if (phuc.equals(layer) || phuc.equals(FFMPRecord.ALL)) {
monitor.processUris(qpeURIs, isProductLoad, siteKey,
@ -303,38 +305,16 @@ public class FFMPDataLoader extends Thread {
qpfCache = readAggregateRecord(source, pdataKey, wfo);
if (qpfCache != null) {
for (String phuc : hucsToLoad) {
if ((phuc.equals(layer) || phuc
.equals(FFMPRecord.ALL))
&& loadType == LOADER_TYPE.INITIAL
&& source.getSourceName().equals(
config.getFFMPConfigData()
.getIncludedQPF())) {
if (!qpfURIs.isEmpty()) {
monitor.processUris(qpfURIs, isProductLoad,
siteKey, source.getSourceName(),
timeBack, phuc);
}
}
}
monitor.insertFFMPData(qpfCache, qpfURIs, siteKey,
source.getSourceName());
}
}
// if (isUrisProcessNeeded(qpfData,qpfURIs))
// {/*DR13839*/
// Use this method of QPF data retrieval if you don't have cache
// files
if ((qpfCache == null) && !qpfURIs.isEmpty()) {
if (!qpfURIs.isEmpty()) {
for (String phuc : hucsToLoad) {
if (phuc.equals(layer) || phuc.equals(FFMPRecord.ALL)) { // old
// code:
// keep
// for
// reference*/
// if (isHucProcessNeeded(phuc)) {/*DR13839*/
monitor.processUris(qpfURIs, isProductLoad,
siteKey, product.getQpf(i), timeBack, phuc);
}

View file

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPDataRecordLoader.LoadTask;
import com.raytheon.uf.common.datastorage.DataStoreFactory;
import com.raytheon.uf.common.datastorage.records.FloatDataRecord;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager.SOURCE_TYPE;
@ -72,8 +73,16 @@ public class FFMPBasinData implements ISerializableObject {
@DynamicSerializeElement
private Map<Long, FFMPBasin> basins = new HashMap<Long, FFMPBasin>();
/**
* Pending load tasks that need to be run to fully populate basins
*/
private List<LoadTask> tasks = new ArrayList<LoadTask>();
/**
* Cache of basins in order for easy population from Load Tasks.
*/
private Map<String, FFMPBasin[]> orderedBasinsCache = new HashMap<String, FFMPBasin[]>();
/**
* Public one arg constructor
*
@ -107,10 +116,11 @@ public class FFMPBasinData implements ISerializableObject {
*
* @param basins
*/
public void setBasins(HashMap<Long, FFMPBasin> basins) {
public void setBasins(Map<Long, FFMPBasin> basins) {
if (!tasks.isEmpty()) {
synchronized (tasks) {
tasks.clear();
orderedBasinsCache.clear();
}
}
this.basins = basins;
@ -581,13 +591,19 @@ public class FFMPBasinData implements ISerializableObject {
}
/**
* Add basins from datasetGroupPath in datastoreFile. The basins will not be
* Add basins some basins from a datastoreFile. The basins will not be
* loaded immediately, they will be loaded when they are needed.
*
* @param datastoreFile
* - the file containing data.
* @param datasetGroupPath
* - the datasetGroupPath where the data is
* @param uri
* - datauri of record to load
* @param siteKey
* - siteKey to load
* @param cwa
* - cwa to load
* @param huc
* - huc to load
* @param sourceName
* - the sourceName for the data.
* @param date
@ -596,60 +612,98 @@ public class FFMPBasinData implements ISerializableObject {
* - a collection of Longs which is in the same order as the data
* in the dataStore.
* @param aggregate
* -
*/
public void addBasins(File datastoreFile, String datasetGroupPath,
String sourceName, Date date, Collection<Long> orderedPfafs,
boolean aggregate) {
public void addBasins(File datastoreFile, String uri, String siteKey,
String cwa, String huc, String sourceName, Date date,
Collection<Long> orderedPfafs, boolean aggregate) {
SourceXML source = FFMPSourceConfigurationManager.getInstance()
.getSource(sourceName);
boolean guidance = source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType());
String basinsKey = siteKey + ' ' + cwa + ' ' + huc;
String datasetGroupPath = uri + DataStoreFactory.DEF_SEPARATOR + cwa
+ DataStoreFactory.DEF_SEPARATOR + huc;
synchronized (tasks) {
// clone orderedPfafs to protect from concurrency issues.
orderedPfafs = new ArrayList<Long>(orderedPfafs);
SourceXML source = FFMPSourceConfigurationManager.getInstance()
.getSource(sourceName);
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
FFMPBasin[] basins = this.orderedBasinsCache.get(basinsKey);
if (basins == null) {
basins = new FFMPBasin[orderedPfafs.size()];
int j = 0;
for (Long pfaf : orderedPfafs) {
FFMPBasin basin = this.basins.get(pfaf);
if (basin == null) {
if (guidance) {
basin = new FFMPGuidanceBasin(pfaf, aggregate);
} else {
basin = new FFMPBasin(pfaf, aggregate);
}
this.basins.put(pfaf, basin);
}
basins[j++] = basin;
}
this.orderedBasinsCache.put(basinsKey, basins);
}
if (guidance) {
tasks.add(new LoadGuidanceMapTask(datastoreFile,
datasetGroupPath, orderedPfafs, date, aggregate,
sourceName));
datasetGroupPath, basins, date, sourceName));
} else {
tasks.add(new LoadMapTask(datastoreFile, datasetGroupPath,
orderedPfafs, date, aggregate));
basins, date));
}
}
}
/**
* Add virtual basins from datasetGroupPath in datastoreFile. The basins
* will not be loaded immediately, they will be loaded when they are needed.
* Add virtual basins from a datastoreFile. The basins will not be loaded
* immediately, they will be loaded when they are needed.
*
* @param datastoreFile
* - the file containing data.
* @param datasetGroupPath
* - the datasetGroupPath where the data is
* @param uri
* - datauri of record to load
* @param dataKey
* - dataKey to load
* @param cwa
* - cwa to load
* @param date
* - the date of the data.
* @param orderedMetadata
* - a collection of FFMPVirtualGageBasinMetaData which is in the
* same order as the data in the dataStore.
*/
public void addVirtualBasins(File datastoreFile, String datasetGroupPath,
Date date, Collection<FFMPVirtualGageBasinMetaData> orderedMetadata) {
// clone ordered metadata to protect from concurrency issues.
public void addVirtualBasins(File datastoreFile, String uri,
String dataKey, String cwa, Date date,
Collection<FFMPVirtualGageBasinMetaData> orderedMetadata) {
String basinsKey = dataKey + ' ' + cwa;
String datasetGroupPath = uri + DataStoreFactory.DEF_SEPARATOR + cwa
+ DataStoreFactory.DEF_SEPARATOR + FFMPRecord.ALL;
synchronized (tasks) {
orderedMetadata = new ArrayList<FFMPVirtualGageBasinMetaData>(
orderedMetadata);
FFMPBasin[] basins = this.orderedBasinsCache.get(basinsKey);
if (basins == null) {
basins = new FFMPBasin[orderedMetadata.size()];
int j = 0;
for (FFMPVirtualGageBasinMetaData fvgbmd : orderedMetadata) {
FFMPBasin basin = this.basins.get(fvgbmd.getLookupId());
if (basin == null) {
basin = new FFMPVirtualGageBasin(fvgbmd.getLid(),
fvgbmd.getLookupId(), false);
this.basins.put(fvgbmd.getLookupId(), basin);
}
basins[j++] = basin;
}
this.orderedBasinsCache.put(basinsKey, basins);
}
tasks.add(new LoadVirtualMapTask(datastoreFile, datasetGroupPath,
date, orderedMetadata));
basins, date));
}
}
public void loadNow() {
synchronized (tasks) {
if (!tasks.isEmpty()) {
System.out.println("Loading tasks: " + tasks.size() + " "
+ this);
FFMPDataRecordLoader.loadRecords(tasks);
tasks.clear();
orderedBasinsCache.clear();
}
}
}
@ -659,43 +713,36 @@ public class FFMPBasinData implements ISerializableObject {
*/
private class LoadMapTask extends LoadTask {
protected final Collection<Long> orderedPfafs;
protected final FFMPBasin[] basins;
protected final Date date;
protected final boolean aggregate;
public LoadMapTask(File datastoreFile, String datasetGroupPath,
Collection<Long> orderedPfafs, Date date, boolean aggregate) {
FFMPBasin[] basins, Date date) {
super(datastoreFile, datasetGroupPath);
this.orderedPfafs = orderedPfafs;
this.basins = basins;
this.date = date;
this.aggregate = aggregate;
}
@Override
public void process(FloatDataRecord record) {
float[] values = record.getFloatData();
int j = 0;
for (Long pfaf : orderedPfafs) {
FFMPBasin basin = basins.get(pfaf);
if (basin == null) {
basin = new FFMPBasin(pfaf, aggregate);
basins.put(pfaf, basin);
}
for (int j = 0; j < values.length; j += 1) {
applyValue(basins[j], values[j]);
}
}
if (basin.contains(date)) {
float curval = basin.getValue(date);
if (curval >= 0.0f && values[j] >= 0.0f) {
basin.setValue(date, (curval + values[j]) / 2);
} else if (values[j] >= 0.0f) {
basin.setValue(date, values[j]);
} // do not overwrite original value
} else {
// no value at time exists, write regardless
basin.setValue(date, values[j]);
}
j++;
protected void applyValue(FFMPBasin basin, float value) {
if (basin.contains(date)) {
float curval = basin.getValue(date);
if (curval >= 0.0f && value >= 0.0f) {
basin.setValue(date, (curval + value) / 2);
} else if (value >= 0.0f) {
basin.setValue(date, value);
} // do not overwrite original value
} else {
// no value at time exists, write regardless
basin.setValue(date, value);
}
}
}
@ -708,41 +755,27 @@ public class FFMPBasinData implements ISerializableObject {
private final String sourceName;
public LoadGuidanceMapTask(File datastoreFile, String datasetGroupPath,
Collection<Long> orderedPfafs, Date date, boolean aggregate,
String sourceName) {
super(datastoreFile, datasetGroupPath, orderedPfafs, date,
aggregate);
FFMPBasin[] basins, Date date, String sourceName) {
super(datastoreFile, datasetGroupPath, basins, date);
this.sourceName = sourceName;
}
@Override
public void process(FloatDataRecord record) {
float[] values = record.getFloatData();
int j = 0;
for (Long pfaf : orderedPfafs) {
FFMPGuidanceBasin basin = (FFMPGuidanceBasin) basins.get(pfaf);
protected void applyValue(FFMPBasin basin, float value) {
FFMPGuidanceBasin gBasin = (FFMPGuidanceBasin) basin;
if (basin == null) {
basin = new FFMPGuidanceBasin(pfaf, aggregate);
basins.put(pfaf, basin);
Float curval = gBasin.getValue(date, sourceName);
if (curval != FFMPUtils.MISSING || !curval.isNaN()) {
if (curval >= 0.0f && value >= 0.0f) {
gBasin.setValue(sourceName, date, (curval + value) / 2);
} else if (value >= 0.0f) {
gBasin.setValue(sourceName, date, value);
}
Float curval = basin.getValue(date, sourceName);
if (curval != FFMPUtils.MISSING || !curval.isNaN()) {
if (curval >= 0.0f && values[j] >= 0.0f) {
basin.setValue(sourceName, date,
(curval + values[j]) / 2);
} else if (values[j] >= 0.0f) {
basin.setValue(sourceName, date, values[j]);
}
// do not overwrite original value
} else {
basin.setValue(sourceName, date, values[j]);
}
j++;
// do not overwrite original value
} else {
gBasin.setValue(sourceName, date, value);
}
}
@ -751,39 +784,16 @@ public class FFMPBasinData implements ISerializableObject {
/**
* Task for loading data from a dataRecord into FFMPVirtualGageBasins
*/
private class LoadVirtualMapTask extends LoadTask {
protected final Date date;
protected final Collection<FFMPVirtualGageBasinMetaData> orderedMetadata;
private class LoadVirtualMapTask extends LoadMapTask {
public LoadVirtualMapTask(File datastoreFile, String datasetGroupPath,
Date date,
Collection<FFMPVirtualGageBasinMetaData> orderedMetadata) {
super(datastoreFile, datasetGroupPath);
this.date = date;
this.orderedMetadata = orderedMetadata;
FFMPBasin[] basins, Date date) {
super(datastoreFile, datasetGroupPath, basins, date);
}
@Override
public void process(FloatDataRecord record) {
boolean aggregate = false;
float[] values = record.getFloatData();
if (values != null) {
int j = 0;
for (FFMPVirtualGageBasinMetaData fvgbmd : orderedMetadata) {
FFMPVirtualGageBasin vgbasin = (FFMPVirtualGageBasin) basins
.get(fvgbmd.getLookupId());
if (vgbasin == null) {
vgbasin = new FFMPVirtualGageBasin(fvgbmd.getLid(),
fvgbmd.getLookupId(), aggregate);
basins.put(fvgbmd.getLookupId(), vgbasin);
}
vgbasin.setValue(date, values[j]);
j++;
}
}
protected void applyValue(FFMPBasin basin, float value) {
basin.setValue(date, value);
}
}

View file

@ -393,32 +393,26 @@ public class FFMPRecord extends PersistablePluginDataObject
FFMPTemplates template, String huc, Date date, String sourceName)
throws Exception {
FFMPBasinData fbd = getBasinData(huc);
ImmutableDate idate = getCacheDate(date);
boolean aggregate = true;
if (huc.equals(ALL)) {
aggregate = false;
}
FFMPBasinData fbd = getBasinData(huc);
ImmutableDate idate = getCacheDate(date);
for (DomainXML domain : template.getDomains()) {
LinkedHashMap<Long, ?> map = template.getMap(getSiteKey(), domain.getCwa(), huc);
LinkedHashMap<Long, ?> map = template.getMap(getSiteKey(),
domain.getCwa(), huc);
if (map != null && !map.isEmpty()) {
fbd.addBasins(datastoreFile, uri
+ DataStoreFactory.DEF_SEPARATOR + domain.getCwa()
+ DataStoreFactory.DEF_SEPARATOR
+ huc, sourceName, idate, map.keySet(), aggregate);
fbd.addBasins(datastoreFile, uri, getSiteKey(),
domain.getCwa(), huc, sourceName, idate, map.keySet(),
aggregate);
}
}
// TODO in the future if we can not loadNow then the basinData can get
// really bulk data retrieval which will help performance. Unfortunately
// at this time there is no way to guarantee that the load will not
// happen on the UI thread.
fbd.loadNow();
}
public void retrieveMapFromDataStore(FFMPTemplates template, String huc)
@ -538,19 +532,12 @@ public class FFMPRecord extends PersistablePluginDataObject
int size = lids.size();
if (size > 0) {
fbd.addVirtualBasins(datastoreFile, uri
+ DataStoreFactory.DEF_SEPARATOR + domain.getCwa()
+ DataStoreFactory.DEF_SEPARATOR + ALL, idate,
fbd.addVirtualBasins(datastoreFile, uri, key,
domain.getCwa(), idate,
lids.values());
}
}
}
// TODO in the future if we can not loadNow then the basinData can get
// really bulk data retrieval which will help performance. Unfortunately
// at this time there is no way to guarantee that the load will not
// happen on the UI thread.
fbd.loadNow();
}
/**