Issue #1912 Initial bulk hdf5 access for ffmp

Change-Id: If2b9afbd23d0388c16f05ddaaf47a1ac8dc6b339

Former-commit-id: 4db55d9a58 [formerly 972ca525d04ca13eb43816bdfeb9bf4fb4786d76]
Former-commit-id: fada8ae7dd
This commit is contained in:
Ben Steffensmeier 2013-04-16 15:14:38 -05:00
parent f2e4d0c743
commit 84ee23cc58
7 changed files with 512 additions and 283 deletions

View file

@ -1,7 +1,6 @@
package com.raytheon.uf.viz.monitor.ffmp;
import java.io.File;
import java.io.FileNotFoundException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
@ -36,7 +35,6 @@ import com.raytheon.uf.common.dataplugin.ffmp.FFMPTemplates.MODE;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPVirtualGageBasin;
import com.raytheon.uf.common.datastorage.DataStoreFactory;
import com.raytheon.uf.common.datastorage.IDataStore;
import com.raytheon.uf.common.datastorage.StorageException;
import com.raytheon.uf.common.monitor.config.FFFGDataMgr;
import com.raytheon.uf.common.monitor.config.FFMPRunConfigurationManager;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager;
@ -94,6 +92,7 @@ import com.raytheon.uf.viz.monitor.listeners.IMonitorListener;
* 02/20/13 1635 D. Hladky Fixed multi guidance sources
* Mar 6, 2013 1769 dhladky Changed threading to use count down latch.
* Apr 9, 2013 1890 dhladky Fixed the broken cache file load
* Apr 16, 2013 1912 bsteffen Initial bulk hdf5 access for ffmp
*
* </pre>
*
@ -467,7 +466,7 @@ public class FFMPMonitor extends ResourceMonitor {
if (sourceXML.getSourceType().equals(
SOURCE_TYPE.GAGE.getSourceType())
&& phuc.equals(FFMPRecord.ALL)) {
ffmpRec.retrieveVirtualBasinFromDataStore(dataStore,
ffmpRec.retrieveVirtualBasinFromDataStore(loc,
dataUri, getTemplates(siteKey), ffmpRec
.getDataTime().getRefTime(), basin);
} else {
@ -2189,16 +2188,20 @@ public class FFMPMonitor extends ResourceMonitor {
List<String> uris = getLoadedUris(fsiteKey, fsource, fhuc);
String dataUri = fffmpRec.getDataURI();
if (!uris.contains(dataUri)) {
Date refTime = fffmpRec.getDataTime().getRefTime();
File loc = HDF5Util.findHDF5Location(fffmpRec);
IDataStore dataStore = DataStoreFactory.getDataStore(loc);
FFMPSiteData siteData = siteDataMap.get(fsiteKey);
String mySource = fsource;
boolean isGageSource = false;
SourceXML source = fscm.getSource(fsource);
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
mySource = source.getDisplayName();
} else if (source.getSourceType().equals(
SOURCE_TYPE.GAGE.getSourceType())) {
isGageSource = true;
}
FFMPSourceData sourceData = siteData
@ -2207,55 +2210,36 @@ public class FFMPMonitor extends ResourceMonitor {
if (curRecord == null) {
// ensure the record can only be set once
synchronized (siteDataMap) {
curRecord = siteDataMap.get(fsiteKey)
.getSourceData(mySource).getRecord();
siteData = siteDataMap.get(fsiteKey);
sourceData = siteData.getSourceData(mySource);
curRecord = sourceData.getRecord();
if (curRecord == null) {
curRecord = new FFMPRecord(dataUri);
siteDataMap.get(fsiteKey)
.getSourceData(mySource)
.setRecord(curRecord);
sourceData.setRecord(curRecord);
}
}
}
SourceXML sourceXML = fscm.getSource(mySource);
try {
if ((sourceXML != null)
&& sourceXML.getSourceType().equals(
SOURCE_TYPE.GAGE.getSourceType())
&& fhuc.equals(FFMPRecord.ALL)) {
try {
curRecord.retrieveVirtualMapFromDataStore(
dataStore, dataUri, getTemplates(fsiteKey),
fffmpRec.getDataTime().getRefTime(),
if (isGageSource && fhuc.equals(FFMPRecord.ALL)) {
curRecord.retrieveVirtualMapFromDataStore(loc,
dataUri, getTemplates(fsiteKey), refTime,
fffmpRec.getSourceName());
} catch (FileNotFoundException e) {
statusHandler.handle(Priority.PROBLEM,
"FFMP Can't find FFMP URI, " + dataUri, e);
} catch (StorageException e) {
statusHandler.handle(Priority.PROBLEM,
"FFMP Can't retrieve (Storage problem) FFMP URI, "
+ dataUri, e);
}
} else {
try {
} else {
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.handle(Priority.DEBUG,
"Retrieving and Populating URI: , "
+ dataUri);
}
curRecord.retrieveMapFromDataStore(dataStore,
dataUri,
curRecord.retrieveMapFromDataStore(loc, dataUri,
getTemplates(fffmpRec.getSiteKey()), fhuc,
fffmpRec.getDataTime().getRefTime(),
fffmpRec.getSourceName());
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"FFMP Can't retrieve FFMP URI, " + dataUri,
e);
refTime, fffmpRec.getSourceName());
}
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"FFMP Can't retrieve FFMP URI, " + dataUri, e);
}
sourceData.addLoadedUri(fhuc, dataUri);
}
}

View file

@ -19,11 +19,19 @@
**/
package com.raytheon.uf.common.dataplugin.ffmp;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPDataRecordLoader.LoadTask;
import com.raytheon.uf.common.datastorage.records.FloatDataRecord;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager.SOURCE_TYPE;
import com.raytheon.uf.common.monitor.xml.SourceXML;
import com.raytheon.uf.common.serialization.ISerializableObject;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@ -40,6 +48,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* 06/22/09 2152 D. Hladky Initial release
* 01/27/13 1478 D. Hladky Added support for write of aggregate record cache
* 01/27/13 1569 D. Hladky Added support for write of aggregate record cache
* Apr 16, 2013 1912 bsteffen Initial bulk hdf5 access for ffmp
*
* </pre>
*
@ -61,7 +70,9 @@ public class FFMPBasinData implements ISerializableObject {
private String hucLevel;
@DynamicSerializeElement
private HashMap<Long, FFMPBasin> basins = new HashMap<Long, FFMPBasin>();
private Map<Long, FFMPBasin> basins = new HashMap<Long, FFMPBasin>();
private List<LoadTask> tasks = new ArrayList<LoadTask>();
/**
* Public one arg constructor
@ -84,7 +95,10 @@ public class FFMPBasinData implements ISerializableObject {
*
* @return
*/
public HashMap<Long, FFMPBasin> getBasins() {
public Map<Long, FFMPBasin> getBasins() {
if (!tasks.isEmpty()) {
loadNow();
}
return basins;
}
@ -94,6 +108,11 @@ public class FFMPBasinData implements ISerializableObject {
* @param basins
*/
public void setBasins(HashMap<Long, FFMPBasin> basins) {
if (!tasks.isEmpty()) {
synchronized (tasks) {
tasks.clear();
}
}
this.basins = basins;
}
@ -120,7 +139,7 @@ public class FFMPBasinData implements ISerializableObject {
* @param basin
*/
public void put(Long key, FFMPBasin basin) {
basins.put(key, basin);
getBasins().put(key, basin);
}
/**
@ -130,7 +149,7 @@ public class FFMPBasinData implements ISerializableObject {
* @return
*/
public FFMPBasin get(Long key) {
return basins.get(key);
return getBasins().get(key);
}
/**
@ -138,9 +157,8 @@ public class FFMPBasinData implements ISerializableObject {
*
* @return
*/
public ArrayList<Long> getPfafIds() {
ArrayList<Long> pfafs = new ArrayList<Long>(basins.keySet());
return pfafs;
public List<Long> getPfafIds() {
return new ArrayList<Long>(getBasins().keySet());
}
/**
@ -155,7 +173,7 @@ public class FFMPBasinData implements ISerializableObject {
float tvalue = 0.0f;
int i = 0;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
tvalue += basin.getValue(beforeDate, afterDate);
i++;
@ -180,7 +198,7 @@ public class FFMPBasinData implements ISerializableObject {
float tarea = 0.0f;
int i = 0;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
if (basin.getValue() != FFMPUtils.MISSING) {
tvalue += (basin.getValue() * areas.get(i));
@ -210,7 +228,7 @@ public class FFMPBasinData implements ISerializableObject {
float tvalue = 0.0f;
int i = 0;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
tvalue += basin.getAccumValue(beforeDate, afterDate,
expirationTime, rate);
@ -234,7 +252,7 @@ public class FFMPBasinData implements ISerializableObject {
float tvalue = 0.0f;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
Float value = basin.getValue(beforeDate, afterDate);
if (value > tvalue) {
@ -244,9 +262,10 @@ public class FFMPBasinData implements ISerializableObject {
}
return tvalue;
}
/**
* Used for mosaic sources
*
* @param pfaf_ids
* @param date
* @param expiration
@ -257,7 +276,7 @@ public class FFMPBasinData implements ISerializableObject {
float tvalue = 0.0f;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
Float value = basin.getAverageValue(date, expiration);
if (value > tvalue) {
@ -267,9 +286,10 @@ public class FFMPBasinData implements ISerializableObject {
}
return tvalue;
}
/**
* Used for mosaic sources
*
* @param pfaf_ids
* @param date
* @param expiration
@ -280,7 +300,7 @@ public class FFMPBasinData implements ISerializableObject {
float tvalue = 0.0f;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
Float value = basin.getAverageValue(afterDate, beforeDate);
if (value > tvalue) {
@ -302,7 +322,7 @@ public class FFMPBasinData implements ISerializableObject {
float tvalue = 0.0f;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
Float value = basin.getValue(date);
if (value > tvalue) {
@ -328,7 +348,7 @@ public class FFMPBasinData implements ISerializableObject {
float value;
int i = 0;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin == null) {
return guidance;
@ -408,7 +428,7 @@ public class FFMPBasinData implements ISerializableObject {
long parentPfaf) {
float tvalue = Float.NaN;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
FFMPGuidanceBasin fgb = (FFMPGuidanceBasin) basin;
fgb.setCountyFips(parentPfaf);
@ -463,18 +483,18 @@ public class FFMPBasinData implements ISerializableObject {
float tvalue = 0.0f;
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
float val = basin.getAccumValue(afterDate, beforeDate,
expirationTime, rate);
if (val > tvalue) {
tvalue = val;
}
}
}
return tvalue;
}
@ -489,7 +509,7 @@ public class FFMPBasinData implements ISerializableObject {
FFMPGuidanceInterpolation interpolation, long expiration) {
List<Float> values = new ArrayList<Float>();
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
FFMPGuidanceBasin fgb = (FFMPGuidanceBasin) basin;
if (interpolation.isInterpolate()) {
@ -516,11 +536,11 @@ public class FFMPBasinData implements ISerializableObject {
* @param rate
* @return
*/
public List<Float> getAccumValues(List<Long> pfaf_ids,
Date beforeDate, Date afterDate, long expirationTime, boolean rate) {
public List<Float> getAccumValues(List<Long> pfaf_ids, Date beforeDate,
Date afterDate, long expirationTime, boolean rate) {
List<Float> values = new ArrayList<Float>();
for (Long pfaf : pfaf_ids) {
FFMPBasin basin = basins.get(pfaf);
FFMPBasin basin = getBasins().get(pfaf);
if (basin != null) {
values.add(basin.getAccumValue(beforeDate, afterDate,
expirationTime, rate));
@ -535,18 +555,18 @@ public class FFMPBasinData implements ISerializableObject {
* @param date
*/
public void purgeData(Date date) {
for (FFMPBasin basin : basins.values()) {
for (FFMPBasin basin : getBasins().values()) {
basin.purgeData(date);
}
}
/**
* deserialize data from the aggregate record
*
* @param times
*/
public void populate(List<Long> times) {
for (FFMPBasin basin : basins.values()) {
for (FFMPBasin basin : getBasins().values()) {
basin.deserialize(times);
}
}
@ -555,9 +575,217 @@ public class FFMPBasinData implements ISerializableObject {
* populates the serialized array/objects
*/
public void serialize() {
for (FFMPBasin basin : basins.values()) {
for (FFMPBasin basin : getBasins().values()) {
basin.serialize();
}
}
/**
* Add basins from datasetGroupPath in datastoreFile. The basins will not be
* loaded immediately, they will be loaded when they are needed.
*
* @param datastoreFile
* - the file containing data.
* @param datasetGroupPath
* - the datasetGroupPath where the data is
* @param sourceName
* - the sourceName for the data.
* @param date
* - the date of the data.
* @param orderedPfafs
* - a collection of Longs which is in the same order as the data
* in the dataStore.
* @param aggregate
* -
*/
public void addBasins(File datastoreFile, String datasetGroupPath,
String sourceName, Date date, Collection<Long> orderedPfafs,
boolean aggregate) {
synchronized (tasks) {
// clone orderedPfafs to protect from concurrency issues.
orderedPfafs = new ArrayList<Long>(orderedPfafs);
SourceXML source = FFMPSourceConfigurationManager.getInstance()
.getSource(sourceName);
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
tasks.add(new LoadGuidanceMapTask(datastoreFile,
datasetGroupPath, orderedPfafs, date, aggregate,
sourceName));
} else {
tasks.add(new LoadMapTask(datastoreFile, datasetGroupPath,
orderedPfafs, date, aggregate));
}
}
}
/**
* Add virtual basins from datasetGroupPath in datastoreFile. The basins
* will not be loaded immediately, they will be loaded when they are needed.
*
* @param datastoreFile
* - the file containing data.
* @param datasetGroupPath
* - the datasetGroupPath where the data is
* @param date
* - the date of the data.
* @param orderedMetadata
* - a collection of FFMPVirtualGageBasinMetaData which is in the
* same order as the data in the dataStore.
*/
public void addVirtualBasins(File datastoreFile, String datasetGroupPath,
Date date, Collection<FFMPVirtualGageBasinMetaData> orderedMetadata) {
// clone ordered metadata to protect from concurrency issues.
synchronized (tasks) {
orderedMetadata = new ArrayList<FFMPVirtualGageBasinMetaData>(
orderedMetadata);
tasks.add(new LoadVirtualMapTask(datastoreFile, datasetGroupPath,
date, orderedMetadata));
}
}
public void loadNow() {
synchronized (tasks) {
if (!tasks.isEmpty()) {
System.out.println("Loading tasks: " + tasks.size() + " "
+ this);
FFMPDataRecordLoader.loadRecords(tasks);
tasks.clear();
}
}
}
/**
* Base task for loading data from a dataRecord into FFMPBasins
*/
private class LoadMapTask extends LoadTask {
protected final Collection<Long> orderedPfafs;
protected final Date date;
protected final boolean aggregate;
public LoadMapTask(File datastoreFile, String datasetGroupPath,
Collection<Long> orderedPfafs, Date date, boolean aggregate) {
super(datastoreFile, datasetGroupPath);
this.orderedPfafs = orderedPfafs;
this.date = date;
this.aggregate = aggregate;
}
@Override
public void process(FloatDataRecord record) {
float[] values = record.getFloatData();
int j = 0;
for (Long pfaf : orderedPfafs) {
FFMPBasin basin = basins.get(pfaf);
if (basin == null) {
basin = new FFMPBasin(pfaf, aggregate);
basins.put(pfaf, basin);
}
if (basin.contains(date)) {
float curval = basin.getValue(date);
if (curval >= 0.0f && values[j] >= 0.0f) {
basin.setValue(date, (curval + values[j]) / 2);
} else if (values[j] >= 0.0f) {
basin.setValue(date, values[j]);
} // do not overwrite original value
} else {
// no value at time exists, write regardless
basin.setValue(date, values[j]);
}
j++;
}
}
}
/**
* Task for loading data from a dataRecord into FFMPGuidanceBasins
*/
private class LoadGuidanceMapTask extends LoadMapTask {
private final String sourceName;
public LoadGuidanceMapTask(File datastoreFile, String datasetGroupPath,
Collection<Long> orderedPfafs, Date date, boolean aggregate,
String sourceName) {
super(datastoreFile, datasetGroupPath, orderedPfafs, date,
aggregate);
this.sourceName = sourceName;
}
@Override
public void process(FloatDataRecord record) {
float[] values = record.getFloatData();
int j = 0;
for (Long pfaf : orderedPfafs) {
FFMPGuidanceBasin basin = (FFMPGuidanceBasin) basins.get(pfaf);
if (basin == null) {
basin = new FFMPGuidanceBasin(pfaf, aggregate);
basins.put(pfaf, basin);
}
Float curval = basin.getValue(date, sourceName);
if (curval != FFMPUtils.MISSING || !curval.isNaN()) {
if (curval >= 0.0f && values[j] >= 0.0f) {
basin.setValue(sourceName, date,
(curval + values[j]) / 2);
} else if (values[j] >= 0.0f) {
basin.setValue(sourceName, date, values[j]);
}
// do not overwrite original value
} else {
basin.setValue(sourceName, date, values[j]);
}
j++;
}
}
}
/**
* Task for loading data from a dataRecord into FFMPVirtualGageBasins
*/
private class LoadVirtualMapTask extends LoadTask {
protected final Date date;
protected final Collection<FFMPVirtualGageBasinMetaData> orderedMetadata;
public LoadVirtualMapTask(File datastoreFile, String datasetGroupPath,
Date date,
Collection<FFMPVirtualGageBasinMetaData> orderedMetadata) {
super(datastoreFile, datasetGroupPath);
this.date = date;
this.orderedMetadata = orderedMetadata;
}
@Override
public void process(FloatDataRecord record) {
boolean aggregate = false;
float[] values = record.getFloatData();
if (values != null) {
int j = 0;
for (FFMPVirtualGageBasinMetaData fvgbmd : orderedMetadata) {
FFMPVirtualGageBasin vgbasin = (FFMPVirtualGageBasin) basins
.get(fvgbmd.getLookupId());
if (vgbasin == null) {
vgbasin = new FFMPVirtualGageBasin(fvgbmd.getLid(),
fvgbmd.getLookupId(), aggregate);
basins.put(fvgbmd.getLookupId(), vgbasin);
}
vgbasin.setValue(date, values[j]);
j++;
}
}
}
}
}

View file

@ -21,8 +21,8 @@ package com.raytheon.uf.common.dataplugin.ffmp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -46,6 +46,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* 07/31/12 578 D.Hladky finished it
* 09/27/12 DR 15471 G.Zhang Fixed ConcurrentModificationException
* 01/27/13 1478 D. Hladky Re-worked to help with memory size and NAS read write stress
* Apr 16, 2013 1912 bsteffen Initial bulk hdf5 access for ffmp
*
* </pre>
*
* @author dhladky
@ -57,7 +59,7 @@ public class FFMPDataContainer {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(FFMPDataContainer.class);
private final ConcurrentHashMap<String, FFMPBasinData> basinDataMap = new ConcurrentHashMap<String, FFMPBasinData>();// DR
private final Map<String, FFMPBasinData> basinDataMap = new ConcurrentHashMap<String, FFMPBasinData>();// DR
private String sourceName = null;
@ -73,7 +75,7 @@ public class FFMPDataContainer {
*/
public FFMPDataContainer(String sourceName) {
this.sourceName = sourceName;
basinDataMap.put("ALL", new FFMPBasinData("ALL"));
basinDataMap.put(FFMPRecord.ALL, new FFMPBasinData(FFMPRecord.ALL));
// System.out.println("Creating source: " + sourceName);
}
@ -281,9 +283,10 @@ public class FFMPDataContainer {
public boolean containsKey(Date date) {
boolean contains = false;
if (getBasinData("ALL") != null) {
if (getBasinData(FFMPRecord.ALL) != null) {
HashMap<Long, FFMPBasin> basins = getBasinData("ALL").getBasins();
Map<Long, FFMPBasin> basins = getBasinData(FFMPRecord.ALL)
.getBasins();
synchronized (basins) {
for (Entry<Long, FFMPBasin> entry : basins.entrySet()) {
@ -306,7 +309,7 @@ public class FFMPDataContainer {
*/
public boolean containsKey(String sourceName) {
boolean contains = false;
HashMap<Long, FFMPBasin> basins = getBasinData("ALL").getBasins();
Map<Long, FFMPBasin> basins = getBasinData(FFMPRecord.ALL).getBasins();
synchronized (basins) {
for (Entry<Long, FFMPBasin> entry : basins.entrySet()) {
@ -356,7 +359,8 @@ public class FFMPDataContainer {
public double getMaxValue(ArrayList<Long> pfafs, Date backDate,
Date currDate, long expirationTime, boolean rate) {
double val = getBasinData("ALL").getAccumMaxValue(pfafs, currDate,
double val = getBasinData(FFMPRecord.ALL).getAccumMaxValue(pfafs,
currDate,
backDate, expirationTime, rate);
return val;
@ -370,7 +374,8 @@ public class FFMPDataContainer {
public Date getNewest() {
try {
HashMap<Long, FFMPBasin> basins = getBasinData("ALL").getBasins();
Map<Long, FFMPBasin> basins = getBasinData(FFMPRecord.ALL)
.getBasins();
synchronized (basins) {
for (Entry<Long, FFMPBasin> entry : basins.entrySet()) {
@ -397,7 +402,8 @@ public class FFMPDataContainer {
*/
public Date getOldest() {
try {
HashMap<Long, FFMPBasin> basins = getBasinData("ALL").getBasins();
Map<Long, FFMPBasin> basins = getBasinData(FFMPRecord.ALL)
.getBasins();
synchronized (basins) {
for (Entry<Long, FFMPBasin> entry : basins.entrySet()) {
@ -425,7 +431,8 @@ public class FFMPDataContainer {
public List<Date> getOrderedTimes(Date barrierTime) {
ArrayList<Date> orderedTimes = new ArrayList<Date>();
try {
HashMap<Long, FFMPBasin> basins = getBasinData("ALL").getBasins();
Map<Long, FFMPBasin> basins = getBasinData(FFMPRecord.ALL)
.getBasins();
synchronized (basins) {
for (Entry<Long, FFMPBasin> entry : basins.entrySet()) {
@ -454,7 +461,8 @@ public class FFMPDataContainer {
public List<Long> getOrderedTimes() {
ArrayList<Long> orderedTimes = new ArrayList<Long>();
try {
HashMap<Long, FFMPBasin> basins = getBasinData("ALL").getBasins();
Map<Long, FFMPBasin> basins = getBasinData(FFMPRecord.ALL)
.getBasins();
synchronized (basins) {
for (Entry<Long, FFMPBasin> entry : basins.entrySet()) {
@ -547,7 +555,7 @@ public class FFMPDataContainer {
*/
public int size() {
HashMap<Long, FFMPBasin> basins = getBasinData("ALL").getBasins();
Map<Long, FFMPBasin> basins = getBasinData(FFMPRecord.ALL).getBasins();
synchronized (basins) {
for (Entry<Long, FFMPBasin> entry : basins.entrySet()) {
@ -570,7 +578,7 @@ public class FFMPDataContainer {
if(fbd==null || key==null)
return;
HashMap<Long,FFMPBasin> basins = fbd.getBasins();
Map<Long, FFMPBasin> basins = fbd.getBasins();
if(basins == null)
return;
@ -583,7 +591,7 @@ public class FFMPDataContainer {
* Gets the basin data map
* @return
*/
public ConcurrentHashMap<String, FFMPBasinData> getBasinMap() {
public Map<String, FFMPBasinData> getBasinMap() {
return basinDataMap;
}

View file

@ -0,0 +1,153 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.dataplugin.ffmp;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.raytheon.uf.common.datastorage.DataStoreFactory;
import com.raytheon.uf.common.datastorage.IDataStore;
import com.raytheon.uf.common.datastorage.Request;
import com.raytheon.uf.common.datastorage.records.FloatDataRecord;
import com.raytheon.uf.common.datastorage.records.IDataRecord;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
/**
* Implements a bulk retrieval mechanism for FFMP.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 5, 2013 1912 bsteffen Initial creation
*
* </pre>
*
* @author bsteffen
* @version 1.0
*/
public class FFMPDataRecordLoader {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(FFMPDataRecordLoader.class);
// Anything that needs bulk loading will need to extend this task.
public static abstract class LoadTask {
private final File datastoreFile;
private final String datasetGroupPath;
public LoadTask(File datastoreFile, String datasetGroupPath) {
this.datastoreFile = datastoreFile;
this.datasetGroupPath = datasetGroupPath;
}
public abstract void process(FloatDataRecord record);
public File getDatastoreFile() {
return datastoreFile;
}
public String getDatasetGroupPath() {
return datasetGroupPath;
}
}
/**
* Bulk load all records for a set of tasks. Tasks are guaranteed to be
* executed in order but the data records will be loaded as efficiently as
* possible.
*
* @param tasks
*/
public static void loadRecords(List<LoadTask> tasks) {
// sort all the tasks by file.
Map<File, List<LoadTask>> fileMap = new HashMap<File, List<LoadTask>>();
for (LoadTask task : tasks) {
List<LoadTask> taskList = fileMap.get(task.getDatastoreFile());
if (taskList == null) {
taskList = new ArrayList<LoadTask>();
fileMap.put(task.getDatastoreFile(), taskList);
}
taskList.add(task);
}
Map<LoadTask, FloatDataRecord> dataMap = new HashMap<LoadTask, FloatDataRecord>();
// load each file
for (Entry<File, List<LoadTask>> fileEntry : fileMap.entrySet()) {
IDataStore dataStore = DataStoreFactory.getDataStore(fileEntry
.getKey());
List<LoadTask> taskList = fileEntry.getValue();
// assemble all the paths.
String[] datasetGroupPath = new String[taskList.size()];
for (int i = 0; i < datasetGroupPath.length; i += 1) {
datasetGroupPath[i] = taskList.get(i).getDatasetGroupPath();
}
// perform the data request.
IDataRecord[] dataRecords = null;
try {
dataRecords = dataStore.retrieveDatasets(datasetGroupPath,
Request.ALL);
} catch (Exception e) {
// If something went wrong try to retrieve each record
// individually so only records with errors are skipped.
dataRecords = new IDataRecord[datasetGroupPath.length];
for (int i = 0; i < datasetGroupPath.length; i += 1) {
try {
IDataRecord[] drs = dataStore.retrieveDatasets(
new String[] { datasetGroupPath[i] },
Request.ALL);
dataRecords[i] = drs[0];
} catch (Exception e1) {
statusHandler.handle(Priority.DEBUG,
"FFMPRecord: no data record for: "
+ datasetGroupPath, e1);
}
}
}
// correlate them in the dataMap.
for (int i = 0; i < dataRecords.length; i += 1) {
if (dataRecords[i] != null
&& dataRecords[i] instanceof FloatDataRecord) {
dataMap.put(taskList.get(i),
(FloatDataRecord) dataRecords[i]);
}
}
}
// execute all tasks.
for (LoadTask task : tasks) {
FloatDataRecord rec = dataMap.get(task);
if (rec != null) {
task.process(rec);
}
}
}
}

View file

@ -27,7 +27,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -47,13 +46,16 @@ import org.hibernate.annotations.Index;
import com.raytheon.uf.common.dataplugin.IDecoderGettable;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.annotations.DataURI;
import com.raytheon.uf.common.dataplugin.persist.IHDFFilePathProvider;
import com.raytheon.uf.common.dataplugin.persist.IPersistable;
import com.raytheon.uf.common.dataplugin.persist.PersistablePluginDataObject;
import com.raytheon.uf.common.datastorage.DataStoreFactory;
import com.raytheon.uf.common.datastorage.IDataStore;
import com.raytheon.uf.common.datastorage.Request;
import com.raytheon.uf.common.datastorage.StorageException;
import com.raytheon.uf.common.datastorage.records.FloatDataRecord;
import com.raytheon.uf.common.datastorage.records.IDataRecord;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager.SOURCE_TYPE;
import com.raytheon.uf.common.monitor.xml.DomainXML;
@ -81,6 +83,7 @@ import com.raytheon.uf.common.time.util.ImmutableDate;
* Apr 8, 2013 1293 bkowal Removed references to hdffileid.
* April, 9 2013 1890 dhladky Moved dates to referenced map in record rather than multiple dates in FFMPBasin objs.
* Apr 12, 2013 1857 bgonzale Added SequenceGenerator annotation.
* Apr 16, 2013 1912 bsteffen Initial bulk hdf5 access for ffmp
*
* </pre>
*
@ -385,115 +388,53 @@ public class FFMPRecord extends PersistablePluginDataObject
* @param dataStore
* @param huc
*/
public void retrieveMapFromDataStore(IDataStore dataStore, String uri,
public void retrieveMapFromDataStore(File datastoreFile, String uri,
FFMPTemplates template, String huc, Date date, String sourceName)
throws Exception {
FFMPBasinData fbd = null;
boolean aggregate = true;
if (huc.equals(ALL)) {
aggregate = false;
}
fbd = getBasinData(huc);
FFMPBasinData fbd = getBasinData(huc);
ImmutableDate idate = getCacheDate(date);
SourceXML source = FFMPSourceConfigurationManager.getInstance()
.getSource(sourceName);
for (DomainXML domain : template.getDomains()) {
LinkedHashMap<Long, ?> map = template.getMap(getSiteKey(), domain.getCwa(), huc);
if (map != null && !map.isEmpty()) {
IDataRecord rec = null;
try {
rec = dataStore.retrieve(uri + "/" + domain.getCwa(), huc,
Request.ALL);
} catch (Exception e) {
statusHandler.handle(Priority.DEBUG,
"FFMPRecord: no data record for: " + uri + "/"
+ domain.getCwa());
}
if (rec != null) {
float[] values = ((FloatDataRecord) rec).getFloatData();
int j = 0;
if (values != null) {
// System.err.println(sourceName);
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
for (Long pfaf : map.keySet()) {
try {
FFMPGuidanceBasin basin = (FFMPGuidanceBasin) fbd
.get(pfaf);
if (basin == null) {
basin = new FFMPGuidanceBasin(pfaf,
aggregate);
fbd.put(pfaf, basin);
}
Float curval = basin.getValue(idate,
sourceName);
if (curval != FFMPUtils.MISSING
|| !curval.isNaN()) {
if (curval >= 0.0f && values[j] >= 0.0f) {
basin.setValue(sourceName, idate,
(curval + values[j]) / 2);
} else if (values[j] >= 0.0f){
basin.setValue(sourceName, idate,
values[j]);
}
// do not overwrite original value
} else {
basin.setValue(sourceName, idate,
values[j]);
}
j++;
} catch (Exception e) {
break;
}
}
} else {
for (Long pfaf : map.keySet()) {
try {
FFMPBasin basin = fbd.get(pfaf);
if (basin == null) {
basin = new FFMPBasin(pfaf, aggregate);
fbd.put(pfaf, basin);
}
if (basin.contains(idate)) {
float curval = basin.getValue(idate);
if (curval >= 0.0f && values[j] >= 0.0f) {
basin.setValue(idate,
(curval + values[j]) / 2);
} else if (values[j] >= 0.0f) {
basin.setValue(idate, values[j]);
} // do not overwrite original value
} else {
// no value at time exists, write regardless
basin.setValue(idate, values[j]);
}
j++;
} catch (Exception e) {
break;
}
}
}
}
}
fbd.addBasins(datastoreFile, uri
+ DataStoreFactory.DEF_SEPARATOR + domain.getCwa()
+ DataStoreFactory.DEF_SEPARATOR
+ huc, sourceName, idate, map.keySet(), aggregate);
}
}
// TODO in the future if we can not loadNow then the basinData can get
// really bulk data retrieval which will help performance. Unfortunately
// at this time there is no way to guarantee that the load will not
// happen on the UI thread.
fbd.loadNow();
}
public void retrieveMapFromDataStore(FFMPTemplates template, String huc)
throws Exception {
retrieveMapFromDataStore(getDataStoreFile(), getDataURI(), template,
huc, getDataTime().getRefTime(), getSourceName());
}
private File getDataStoreFile() {
IHDFFilePathProvider pathProvider = getHDFPathProvider();
String path = pathProvider.getHDFPath(getPluginName(), this);
String fileName = pathProvider.getHDFFileName(getPluginName(), this);
File datastoreFile = new File(getPluginName() + IPathManager.SEPARATOR
+ path + IPathManager.SEPARATOR + fileName);
return datastoreFile;
}
/**
@ -529,7 +470,8 @@ public class FFMPRecord extends PersistablePluginDataObject
try {
IDataRecord rec = dataStore.retrieve(
uri + "/" + domain.getCwa(), huc,
uri + DataStoreFactory.DEF_SEPARATOR
+ domain.getCwa(), huc,
Request.buildPointRequest(new Point(index, 0)));
if (rec != null) {
@ -573,12 +515,10 @@ public class FFMPRecord extends PersistablePluginDataObject
* @param dataStore
* @param huc
*/
public void retrieveVirtualMapFromDataStore(IDataStore dataStore,
String uri, FFMPTemplates template, Date date, String sourceName)
public void retrieveVirtualMapFromDataStore(File datastoreFile, String uri,
FFMPTemplates template, Date date, String sourceName)
throws StorageException, FileNotFoundException {
FFMPBasinData fbd = null;
boolean aggregate = false;
fbd = getBasinData(ALL);
FFMPBasinData fbd = getBasinData(ALL);
String key = getDataKey();
ImmutableDate idate = getCacheDate(date);
@ -591,49 +531,19 @@ public class FFMPRecord extends PersistablePluginDataObject
int size = lids.size();
if (size > 0) {
IDataRecord rec = null;
try {
rec = dataStore.retrieve(uri + "/" + domain.getCwa(),
ALL, Request.ALL);
} catch (Exception e) {
// This is a routine error. Sometimes you can not have
// data for a configured source
// This suppresses spurrious messages that would inflate
// the loags needlessly.
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.handle(Priority.DEBUG,
"FFMPRecord: no data for: " + uri + "/"
+ domain.getCwa());
}
}
if (rec != null) {
float[] values = ((FloatDataRecord) rec).getFloatData();
if (values != null) {
int j = 0;
for (Entry<String, FFMPVirtualGageBasinMetaData> entry : lids
.entrySet()) {
FFMPVirtualGageBasinMetaData fvgbmd = entry
.getValue();
FFMPVirtualGageBasin vgbasin = (FFMPVirtualGageBasin) fbd
.get(fvgbmd.getLookupId());
if (vgbasin == null) {
vgbasin = new FFMPVirtualGageBasin(
fvgbmd.getLid(),
fvgbmd.getLookupId(), aggregate);
fbd.put(fvgbmd.getLookupId(), vgbasin);
}
vgbasin.setValue(idate, values[j]);
j++;
}
}
}
fbd.addVirtualBasins(datastoreFile, uri
+ DataStoreFactory.DEF_SEPARATOR + domain.getCwa()
+ DataStoreFactory.DEF_SEPARATOR + ALL, idate,
lids.values());
}
}
}
// TODO in the future if we can not loadNow then the basinData can get
// really bulk data retrieval which will help performance. Unfortunately
// at this time there is no way to guarantee that the load will not
// happen on the UI thread.
fbd.loadNow();
}
/**
@ -642,59 +552,12 @@ public class FFMPRecord extends PersistablePluginDataObject
* @param dataStore
* @param huc
*/
public void retrieveVirtualBasinFromDataStore(IDataStore dataStore,
public void retrieveVirtualBasinFromDataStore(File datastoreFile,
String uri, FFMPTemplates template, Date date, FFMPBasin basin) {
FFMPBasinData fbd = null;
try {
boolean aggregate = false;
fbd = getBasinData(ALL);
String key = getDataKey();
ImmutableDate idate = getCacheDate(date);
for (DomainXML domain : template.getDomains()) {
LinkedHashMap<String, FFMPVirtualGageBasinMetaData> lids = template.getVirtualGageBasins(key, domain.getCwa());
int size = lids.size();
if (size > 0) {
try {
IDataRecord rec = dataStore
.retrieve(uri + "/" + domain.getCwa(), ALL,
Request.ALL);
if (rec != null) {
float[] values = ((FloatDataRecord) rec)
.getFloatData();
if (values != null) {
int j = 0;
for (Entry<String, FFMPVirtualGageBasinMetaData> entry : lids
.entrySet()) {
FFMPVirtualGageBasinMetaData fvgbmd = entry
.getValue();
FFMPVirtualGageBasin vgbasin = (FFMPVirtualGageBasin) fbd
.get(fvgbmd.getLookupId());
if (vgbasin == null) {
vgbasin = new FFMPVirtualGageBasin(
fvgbmd.getLid(),
fvgbmd.getLookupId(), aggregate);
fbd.put(fvgbmd.getLookupId(), vgbasin);
}
vgbasin.setValue(idate, values[j]);
j++;
}
}
}
}
catch (Throwable e) {
statusHandler.handle(
Priority.PROBLEM,
"ERROR Retrieving Virtual ..."
+ domain.getCwa() + " : " + ALL);
}
}
}
// Should this be retrieving a single basin instead of all of them?
retrieveVirtualMapFromDataStore(datastoreFile, uri, template, date,
uri);
} catch (Throwable e) {
statusHandler.handle(Priority.ERROR, "ERROR Retrieving Virtual..."
+ ALL);

View file

@ -35,7 +35,6 @@ import com.raytheon.uf.common.dataaccess.impl.AbstractDataPluginFactory;
import com.raytheon.uf.common.dataaccess.impl.DefaultGeometryData;
import com.raytheon.uf.common.dataaccess.util.DatabaseQueryUtil;
import com.raytheon.uf.common.dataaccess.util.DatabaseQueryUtil.QUERY_MODE;
import com.raytheon.uf.common.dataaccess.util.PDOUtil;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPBasin;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPBasinData;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPRecord;
@ -45,7 +44,6 @@ import com.raytheon.uf.common.dataplugin.ffmp.HucLevelGeometriesFactory;
import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
import com.raytheon.uf.common.dataquery.requests.RequestConstraint.ConstraintType;
import com.raytheon.uf.common.dataquery.responses.DbQueryResponse;
import com.raytheon.uf.common.datastorage.IDataStore;
import com.raytheon.uf.common.monitor.config.FFMPRunConfigurationManager;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager;
import com.raytheon.uf.common.monitor.xml.DomainXML;
@ -65,6 +63,7 @@ import com.vividsolutions.jts.geom.Geometry;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 24, 2013 1552 mpduff Initial creation
* Apr 16, 2013 1912 bsteffen Initial bulk hdf5 access for ffmp
*
* </pre>
*
@ -127,12 +126,8 @@ public class FFMPGeometryFactory extends AbstractDataPluginFactory {
for (Map.Entry<String, Object> es : map.entrySet()) {
FFMPRecord rec = (FFMPRecord) es.getValue();
try {
IDataStore dataStore = PDOUtil.getDataStore(rec);
rec.retrieveMapFromDataStore(dataStore, rec.getDataURI(),
templates,
(String) request.getIdentifiers().get(HUC), rec
.getDataTime().getRefTime(), rec
.getSourceName());
rec.retrieveMapFromDataStore(templates, (String) request
.getIdentifiers().get(HUC));
} catch (Exception e) {
throw new DataRetrievalException(
"Failed to retrieve the IDataRecord for PluginDataObject: "
@ -211,7 +206,7 @@ public class FFMPGeometryFactory extends AbstractDataPluginFactory {
FFMPBasinData basinData = rec.getBasinData(huc);
HashMap<Long, FFMPBasin> basinDataMap = basinData.getBasins();
Map<Long, FFMPBasin> basinDataMap = basinData.getBasins();
HucLevelGeometriesFactory geomFactory = HucLevelGeometriesFactory
.getInstance();

View file

@ -31,7 +31,6 @@ import com.raytheon.uf.common.dataplugin.ffmp.FFMPRecord;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPTemplates;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPUtils;
import com.raytheon.uf.common.dataplugin.ffmp.dao.FFMPDao;
import com.raytheon.uf.common.datastorage.IDataStore;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager;
import com.raytheon.uf.common.monitor.config.FFMPSourceConfigurationManager.SOURCE_TYPE;
import com.raytheon.uf.common.monitor.xml.FFTISourceXML;
@ -56,6 +55,7 @@ import com.raytheon.uf.edex.plugin.ffmp.FFMPGenerator;
* July 11, 2012 dhladky Edited for FFTI work
* 02/01/13 1569 D. Hladky Added constants, records writing switched to pypies
* </pre>
* Apr 16, 2013 1912 bsteffen Initial bulk hdf5 access for ffmp
*
* @author dhladky
* @version 1.0
@ -320,10 +320,8 @@ public class FFTIProcessor {
FFMPDao dao = (FFMPDao) PluginFactory.getInstance().getPluginDao(
rec.getPluginName());
rec = (FFMPRecord) dao.getMetadata(rec.getDataURI());
IDataStore dataStore = dao.getDataStore(rec);
rec.retrieveMapFromDataStore(dataStore, rec.getDataURI(), template,
huc, rec.getDataTime().getRefTime(), rec.getSourceName());
rec.retrieveMapFromDataStore(template, huc);
// System.out.println("Size of huc: "
// + rec.getBasinData(huc).getBasins().size());