Issue #1782 - restore FFPMGenerator.java

Former-commit-id: 813033f4a2b30a669646b288eb00959922f0e804
This commit is contained in:
Steve Harris 2013-03-28 16:06:54 -05:00
parent a308b27c2f
commit 3953f8368b

View file

@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.regex.Pattern;
import com.raytheon.edex.msg.DataURINotificationMessage;
import com.raytheon.edex.plugin.radar.dao.RadarStationDao;
import com.raytheon.edex.urifilter.URIFilter;
import com.raytheon.edex.urifilter.URIGenerateMessage;
@ -47,7 +48,6 @@ import com.raytheon.uf.common.dataplugin.ffmp.FFMPTemplates.MODE;
import com.raytheon.uf.common.dataplugin.ffmp.FFMPUtils;
import com.raytheon.uf.common.dataplugin.ffmp.SourceBinList;
import com.raytheon.uf.common.dataplugin.ffmp.dao.FFMPDao;
import com.raytheon.uf.common.dataplugin.message.DataURINotificationMessage;
import com.raytheon.uf.common.dataplugin.radar.RadarStation;
import com.raytheon.uf.common.dataplugin.radar.util.RadarsInUseUtil;
import com.raytheon.uf.common.datastorage.DataStoreFactory;
@ -58,7 +58,6 @@ import com.raytheon.uf.common.datastorage.StorageProperties;
import com.raytheon.uf.common.datastorage.StorageProperties.Compression;
import com.raytheon.uf.common.datastorage.records.ByteDataRecord;
import com.raytheon.uf.common.datastorage.records.IDataRecord;
import com.raytheon.uf.common.event.EventBus;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
@ -83,7 +82,6 @@ import com.raytheon.uf.common.monitor.xml.SourceIngestConfigXML;
import com.raytheon.uf.common.monitor.xml.SourceXML;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.stats.ProcessEvent;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
@ -120,9 +118,7 @@ import com.raytheon.uf.edex.plugin.ffmp.common.FFTIRatioDiff;
* 01/27/13 1478 D. Hladky Added creation of full cache records to help read write stress on NAS
* 02/01/13 1569 D. Hladky Added constants, switched to using aggregate records written through pypies
* 02/20/13 1635 D. Hladky Added some finally methods to increase dead lock safety. Reduced wait times for threads.
* Feb 15, 2013 1638 mschenke Moved DataURINotificationMessage to uf.common.dataplugin
* 02/25/13 1660 D. Hladky Redesigned data flow for FFTI in order to have only one mosaic piece in memory at a time.
* 03/22/13 1803 D. Hladky Fixed broken performance logging for ffmp.
* </pre>
*
* @author dhladky
@ -440,7 +436,7 @@ public class FFMPGenerator extends CompositeProductGenerator implements
@Override
public void generateProduct(URIGenerateMessage genMessage) {
if (loaded) {
if (loaded) {
try {
long time = System.currentTimeMillis();
this.config = new FFMPConfig(
@ -705,50 +701,51 @@ public class FFMPGenerator extends CompositeProductGenerator implements
fftiSources.add(ffmp.getFFTISource());
ffti.processFFTI();
}
// Do the accumulation now, more memory efficient.
// Only one piece in memory at a time
for (String attribute : ffmp.getAttributes()) {
if (attribute.equals(ATTRIBUTE.ACCUM
.getAttribute())) {
FFTIAccum accum = getAccumulationForSite(
ffmpProduct.getDisplayName(),
siteKey, dataKey,
fftiSource.getDurationHour(),
ffmpProduct.getUnit(siteKey));
if (statusHandler
.isPriorityEnabled(Priority.DEBUG)) {
statusHandler
.debug("Accumulating FFTI for source: "
+ ffmpProduct
.getDisplayName()
+ " site: "
+ siteKey
+ " data: "
+ dataKey
+ " duration: "
+ fftiSource
.getDurationHour()
+ " accumulation: "
+ accum.getAccumulation());
}
}
}
if (!source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
String sourceSiteDataKey = getSourceSiteDataKey(source,
dataKey, ffmpRec);
ffmpData.remove(sourceSiteDataKey);
statusHandler.info("Removing from memory: "+sourceSiteDataKey);
// Do the accumulation now, more memory efficient.
// Only one piece in memory at a time
for (String attribute : ffmp.getAttributes()) {
if (attribute.equals(ATTRIBUTE.ACCUM
.getAttribute())) {
FFTIAccum accum = getAccumulationForSite(
ffmpProduct.getDisplayName(),
siteKey, dataKey,
fftiSource.getDurationHour(),
ffmpProduct.getUnit(siteKey));
if (statusHandler
.isPriorityEnabled(Priority.DEBUG)) {
statusHandler
.debug("Accumulating FFTI for source: "
+ ffmpProduct
.getDisplayName()
+ " site: "
+ siteKey
+ " data: "
+ dataKey
+ " duration: "
+ fftiSource
.getDurationHour()
+ " accumulation: "
+ accum.getAccumulation());
}
}
}
} // ffmp.isFFTI
} // record not null
} // end sitekey for loop
} // end datakey loop
} // end process
}
SourceXML source = getSourceConfig().getSource(
ffmpRec.getSourceName());
if (!source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
String sourceSiteDataKey = getSourceSiteDataKey(source,
dataKey, ffmpRec);
ffmpData.remove(sourceSiteDataKey);
statusHandler.info("Removing from memory: "+sourceSiteDataKey);
}
}
}
}
}
}
/**
@ -1196,177 +1193,177 @@ public class FFMPGenerator extends CompositeProductGenerator implements
* @param ffmpRec
* @param productKey
*/
public void processDataContainer(FFMPRecord ffmpRec, String productKey) {
public void processDataContainer(FFMPRecord ffmpRec, String productKey) {
String sourceName = null;
Date backDate = null;
String sourceSiteDataKey = null;
FFMPDataContainer fdc = null;
boolean write = true;
String sourceName = null;
Date backDate = null;
String sourceSiteDataKey = null;
FFMPDataContainer fdc = null;
boolean write = true;
try {
// write out the fast loader cache file
long ptime = System.currentTimeMillis();
SourceXML source = getSourceConfig().getSource(
ffmpRec.getSourceName());
String dataKey = ffmpRec.getDataKey();
try {
// write out the fast loader cache file
long ptime = System.currentTimeMillis();
SourceXML source = getSourceConfig().getSource(
ffmpRec.getSourceName());
String dataKey = ffmpRec.getDataKey();
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
sourceName = source.getDisplayName();
sourceSiteDataKey = sourceName;
// FFG is so infrequent go back a day
backDate = new Date(config.getDate().getTime()
- (TimeUtil.MILLIS_PER_HOUR * FFG_SOURCE_CACHE_TIME));
} else {
sourceName = ffmpRec.getSourceName();
sourceSiteDataKey = sourceName + "-" + ffmpRec.getSiteKey()
+ "-" + dataKey;
backDate = new Date(ffmpRec.getDataTime().getRefTime()
.getTime()
- (TimeUtil.MILLIS_PER_HOUR * SOURCE_CACHE_TIME));
}
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
sourceName = source.getDisplayName();
sourceSiteDataKey = sourceName;
// FFG is so infrequent go back a day
backDate = new Date(config.getDate().getTime()
- (TimeUtil.MILLIS_PER_HOUR * FFG_SOURCE_CACHE_TIME));
} else {
sourceName = ffmpRec.getSourceName();
sourceSiteDataKey = sourceName + "-" + ffmpRec.getSiteKey()
+ "-" + dataKey;
backDate = new Date(ffmpRec.getDataTime().getRefTime()
.getTime()
- (TimeUtil.MILLIS_PER_HOUR * SOURCE_CACHE_TIME));
}
// deal with setting of needed HUCS
ArrayList<String> hucs = template.getTemplateMgr().getHucLevels();
// deal with setting of needed HUCS
ArrayList<String> hucs = template.getTemplateMgr().getHucLevels();
if (source.getSourceType().equals(SOURCE_TYPE.GAGE.getSourceType())
|| source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
hucs.clear();
hucs.add(FFMPRecord.ALL);
} else {
hucs.remove(FFMPRecord.VIRTUAL);
}
if (source.getSourceType().equals(SOURCE_TYPE.GAGE.getSourceType())
|| source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
hucs.clear();
hucs.add(FFMPRecord.ALL);
} else {
hucs.remove(FFMPRecord.VIRTUAL);
}
// pull from disk if there
fdc = getFFMPDataContainer(sourceSiteDataKey, hucs, backDate);
// pull from disk if there
fdc = getFFMPDataContainer(sourceSiteDataKey, hucs, backDate);
// brand new or initial load up
if (fdc == null || !loadedData.contains(sourceSiteDataKey)) {
// brand new or initial load up
if (fdc == null || !loadedData.contains(sourceSiteDataKey)) {
long time = System.currentTimeMillis();
fdc = new FFMPDataContainer(sourceSiteDataKey, hucs);
fdc = FFTIProcessor.populateDataContainer(fdc, template, hucs,
backDate, ffmpRec.getDataTime().getRefTime(),
ffmpRec.getWfo(), source, ffmpRec.getSiteKey());
long time = System.currentTimeMillis();
fdc = new FFMPDataContainer(sourceSiteDataKey, hucs);
fdc = FFTIProcessor.populateDataContainer(fdc, template, hucs,
backDate, ffmpRec.getDataTime().getRefTime(),
ffmpRec.getWfo(), source, ffmpRec.getSiteKey());
if (source.getSourceType().equals(
SOURCE_TYPE.GAGE.getSourceType())
|| source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
hucs.clear();
hucs.add(FFMPRecord.ALL);
} else {
hucs.remove(FFMPRecord.VIRTUAL);
}
if (source.getSourceType().equals(
SOURCE_TYPE.GAGE.getSourceType())
|| source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
hucs.clear();
hucs.add(FFMPRecord.ALL);
} else {
hucs.remove(FFMPRecord.VIRTUAL);
}
long time2 = System.currentTimeMillis();
statusHandler.handle(Priority.DEBUG,
"Populated new source: in " + (time2 - time)
+ " ms: source: " + sourceSiteDataKey);
long time2 = System.currentTimeMillis();
statusHandler.handle(Priority.DEBUG,
"Populated new source: in " + (time2 - time)
+ " ms: source: " + sourceSiteDataKey);
} else {
} else {
long time = System.currentTimeMillis();
// guidance sources are treated as a mosaic and are handled
// differently. They are force read at startup.
// This is the main line sequence a source will take when
// updated.
if (!source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
long time = System.currentTimeMillis();
// guidance sources are treated as a mosaic and are handled
// differently. They are force read at startup.
// This is the main line sequence a source will take when
// updated.
if (!source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
Date newDate = fdc.getNewest();
Date oldDate = fdc.getOldest();
Date newDate = fdc.getNewest();
Date oldDate = fdc.getOldest();
if (newDate != null && oldDate != null) {
if ((ffmpRec.getDataTime().getRefTime().getTime() - newDate
.getTime()) >= (source
.getExpirationMinutes(ffmpRec.getSiteKey()) * TimeUtil.MILLIS_PER_MINUTE)) {
// force a re-query back to the newest time in
// existing source container, this will fill in
// gaps
// if
// they exist.
fdc = FFTIProcessor.populateDataContainer(fdc,
template, null, newDate, ffmpRec
.getDataTime().getRefTime(),
ffmpRec.getWfo(), source, ffmpRec
.getSiteKey());
if (newDate != null && oldDate != null) {
if ((ffmpRec.getDataTime().getRefTime().getTime() - newDate
.getTime()) >= (source
.getExpirationMinutes(ffmpRec.getSiteKey()) * TimeUtil.MILLIS_PER_MINUTE)) {
// force a re-query back to the newest time in
// existing source container, this will fill in
// gaps
// if
// they exist.
fdc = FFTIProcessor.populateDataContainer(fdc,
template, null, newDate, ffmpRec
.getDataTime().getRefTime(),
ffmpRec.getWfo(), source, ffmpRec
.getSiteKey());
} else if (oldDate
.after(new Date(
backDate.getTime()
- (source
.getExpirationMinutes(ffmpRec
.getSiteKey()) * TimeUtil.MILLIS_PER_MINUTE)))) {
// force a re-query back to barrierTime for
// existing source container, this happens if
// the
// ingest was turned off for some period of
// time.
fdc = FFTIProcessor.populateDataContainer(fdc,
template, null, backDate, oldDate,
ffmpRec.getWfo(), source,
ffmpRec.getSiteKey());
}
}
} else if (oldDate
.after(new Date(
backDate.getTime()
- (source
.getExpirationMinutes(ffmpRec
.getSiteKey()) * TimeUtil.MILLIS_PER_MINUTE)))) {
// force a re-query back to barrierTime for
// existing source container, this happens if
// the
// ingest was turned off for some period of
// time.
fdc = FFTIProcessor.populateDataContainer(fdc,
template, null, backDate, oldDate,
ffmpRec.getWfo(), source,
ffmpRec.getSiteKey());
}
}
long time2 = System.currentTimeMillis();
statusHandler.handle(Priority.DEBUG,
"Checked Source files: in " + (time2 - time)
+ " ms: source: " + sourceSiteDataKey);
}
}
long time2 = System.currentTimeMillis();
statusHandler.handle(Priority.DEBUG,
"Checked Source files: in " + (time2 - time)
+ " ms: source: " + sourceSiteDataKey);
}
}
// add current record data
for (String huc : hucs) {
fdc.addFFMPEntry(ffmpRec.getDataTime().getRefTime(), source,
ffmpRec.getBasinData(huc), huc, ffmpRec.getSiteKey());
}
// add current record data
for (String huc : hucs) {
fdc.addFFMPEntry(ffmpRec.getDataTime().getRefTime(), source,
ffmpRec.getBasinData(huc), huc, ffmpRec.getSiteKey());
}
// cache it temporarily for FFTI use
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
// only write last one
write = false;
// cache it temporarily for FFTI use
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
// only write last one
write = false;
if (!ffmpData.containsKey(sourceSiteDataKey)) {
ffmpData.put(sourceSiteDataKey, fdc);
} else {
ffmpData.replace(sourceSiteDataKey, fdc);
}
}
if (!ffmpData.containsKey(sourceSiteDataKey)) {
ffmpData.put(sourceSiteDataKey, fdc);
} else {
ffmpData.replace(sourceSiteDataKey, fdc);
}
}
statusHandler.handle(
Priority.INFO,
"Processed FFMPDataContainer: in "
+ (System.currentTimeMillis() - ptime)
+ " ms: source: " + sourceSiteDataKey);
} catch (Exception e) {
statusHandler.handle(Priority.ERROR,
"Failed Processing FFMPDataContainer" + e.getMessage());
statusHandler.handle(
Priority.INFO,
"Processed FFMPDataContainer: in "
+ (System.currentTimeMillis() - ptime)
+ " ms: source: " + sourceSiteDataKey);
} catch (Exception e) {
statusHandler.handle(Priority.ERROR,
"Failed Processing FFMPDataContainer" + e.getMessage());
} finally {
// purge it up
if (fdc != null) {
// this is defensive for if errors get thrown
if (backDate == null) {
backDate = new Date((System.currentTimeMillis())
- (TimeUtil.MILLIS_PER_HOUR * SOURCE_CACHE_TIME));
}
} finally {
// purge it up
if (fdc != null) {
// this is defensive for if errors get thrown
if (backDate == null) {
backDate = new Date((System.currentTimeMillis())
- (TimeUtil.MILLIS_PER_HOUR * SOURCE_CACHE_TIME));
}
if (!fdc.isPurged()) {
fdc.purge(backDate);
}
if (!fdc.isPurged()) {
fdc.purge(backDate);
}
if (write) {
// write it out
writeAggregateRecord(fdc, sourceSiteDataKey);
}
}
}
}
if (write) {
// write it out
writeAggregateRecord(fdc, sourceSiteDataKey);
}
}
}
}
/**
* load existing container
@ -2017,42 +2014,31 @@ public class FFMPGenerator extends CompositeProductGenerator implements
}
/**
* Log process statistics
/**
* Find siteSourceDataKey
*
* @param message
* @param source
* @param dataKey
* @param ffmpRec
* @return
*/
@Override
public void log(URIGenerateMessage message) {
long curTime = System.currentTimeMillis();
ProcessEvent processEvent = new ProcessEvent();
if (productType != null) {
processEvent.setDataType(productType);
}
Long dequeueTime = message.getDeQueuedTime();
if (dequeueTime != null) {
long elapsedMilliseconds = curTime - dequeueTime;
processEvent.setProcessingTime(elapsedMilliseconds);
}
Long enqueueTime = message.getEnQueuedTime();
if (enqueueTime != null) {
long latencyMilliseconds = curTime - enqueueTime;
processEvent.setProcessingLatency(latencyMilliseconds);
}
// processing in less than 0 millis isn't trackable, usually due to
// an
// error occurred and statement logged incorrectly
if ((processEvent.getProcessingLatency() > 0)
&& (processEvent.getProcessingTime() > 0)) {
EventBus.publish(processEvent);
private String getSourceSiteDataKey(SourceXML source, String dataKey, FFMPRecord ffmpRec) {
String sourceName = source.getSourceName();
String sourceSiteDataKey = null;
if (source.getSourceType().equals(
SOURCE_TYPE.GUIDANCE.getSourceType())) {
sourceName = source.getDisplayName();
sourceSiteDataKey = sourceName;
} else {
sourceName = ffmpRec.getSourceName();
sourceSiteDataKey = sourceName + "-" + ffmpRec.getSiteKey()
+ "-" + dataKey;
}
return sourceSiteDataKey;
}
}
}