Issue #1912 decrease the number of seperate database requests in the FFMPDataLoader.
Former-commit-id:7a43a052f6
[formerly 1ba1f77bab277fb191a416e02f5cc56a0147b15d] Former-commit-id:a0fb4b9b5b
This commit is contained in:
parent
f842d5d036
commit
bd751b44f0
2 changed files with 184 additions and 108 deletions
|
@ -11,6 +11,7 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
|
@ -34,6 +35,11 @@ import com.raytheon.uf.common.dataplugin.ffmp.FFMPRecord.FIELDS;
|
|||
import com.raytheon.uf.common.dataplugin.ffmp.FFMPTemplates;
|
||||
import com.raytheon.uf.common.dataplugin.ffmp.FFMPTemplates.MODE;
|
||||
import com.raytheon.uf.common.dataplugin.ffmp.FFMPVirtualGageBasin;
|
||||
import com.raytheon.uf.common.dataquery.requests.DbQueryRequest;
|
||||
import com.raytheon.uf.common.dataquery.requests.DbQueryRequest.OrderMode;
|
||||
import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
|
||||
import com.raytheon.uf.common.dataquery.requests.RequestConstraint.ConstraintType;
|
||||
import com.raytheon.uf.common.dataquery.responses.DbQueryResponse;
|
||||
import com.raytheon.uf.common.datastorage.DataStoreFactory;
|
||||
import com.raytheon.uf.common.datastorage.IDataStore;
|
||||
import com.raytheon.uf.common.monitor.config.FFFGDataMgr;
|
||||
|
@ -56,6 +62,7 @@ import com.raytheon.uf.viz.core.VizApp;
|
|||
import com.raytheon.uf.viz.core.catalog.DirectDbQuery;
|
||||
import com.raytheon.uf.viz.core.catalog.DirectDbQuery.QueryLanguage;
|
||||
import com.raytheon.uf.viz.core.exception.VizException;
|
||||
import com.raytheon.uf.viz.core.requests.ThriftClient;
|
||||
import com.raytheon.uf.viz.monitor.IMonitor;
|
||||
import com.raytheon.uf.viz.monitor.ResourceMonitor;
|
||||
import com.raytheon.uf.viz.monitor.events.IMonitorConfigurationEvent;
|
||||
|
@ -118,7 +125,16 @@ public class FFMPMonitor extends ResourceMonitor {
|
|||
private String wfo = null;
|
||||
|
||||
/** Pattern for dates in radar */
|
||||
public static String datePattern = "yyyy-MM-dd HH:mm:ss";
|
||||
public static ThreadLocal<SimpleDateFormat> datePattern = new ThreadLocal<SimpleDateFormat>() {
|
||||
|
||||
@Override
|
||||
protected SimpleDateFormat initialValue() {
|
||||
SimpleDateFormat datef = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
datef.setTimeZone(TimeZone.getTimeZone("Zulu"));
|
||||
return datef;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
private FFMPSiteDataContainer siteDataMap = new FFMPSiteDataContainer();
|
||||
|
||||
|
@ -490,36 +506,6 @@ public class FFMPMonitor extends ResourceMonitor {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Getting a specific URI
|
||||
*
|
||||
* @param date
|
||||
* @return
|
||||
*/
|
||||
public String getAvailableUri(String siteKey, String dataKey,
|
||||
String sourceName, Date ptime) {
|
||||
|
||||
String uri = null;
|
||||
SimpleDateFormat datef = new SimpleDateFormat(datePattern);
|
||||
datef.setTimeZone(TimeZone.getTimeZone("Zulu"));
|
||||
String sql = "select datauri from ffmp where wfo = '" + getWfo()
|
||||
+ "' and reftime = '" + datef.format(ptime)
|
||||
+ "' and sourcename = '" + sourceName + "' and sitekey = '"
|
||||
+ siteKey + "' and datakey = '" + dataKey
|
||||
+ "' order by reftime";
|
||||
try {
|
||||
List<Object[]> results = DirectDbQuery.executeQuery(sql,
|
||||
"metadata", QueryLanguage.SQL);
|
||||
if (results.size() > 0) {
|
||||
uri = (String) results.get(0)[0];
|
||||
}
|
||||
} catch (VizException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
return uri;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get me the available URIS for this source
|
||||
*
|
||||
|
@ -543,6 +529,112 @@ public class FFMPMonitor extends ResourceMonitor {
|
|||
|
||||
return uris;
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a single database request to populate the availableUris for
|
||||
* multiple sources. After preloading the uris the uris for each source can
|
||||
* be retrieved with getAvailableUris
|
||||
*
|
||||
*/
|
||||
public void preloadAvailableUris(String siteKey, String dataKey,
|
||||
Set<String> sourceNames, Date time) {
|
||||
DbQueryRequest request = new DbQueryRequest();
|
||||
request.setEntityClass(FFMPRecord.class);
|
||||
request.addRequestField("dataURI");
|
||||
request.setOrderByField("dataTime.refTime", OrderMode.DESC);
|
||||
|
||||
request.addConstraint("wfo", new RequestConstraint(getWfo()));
|
||||
request.addConstraint("siteKey", new RequestConstraint(siteKey));
|
||||
request.addConstraint("dataTime.refTime", new RequestConstraint(
|
||||
datePattern.get().format(time),
|
||||
ConstraintType.GREATER_THAN_EQUALS));
|
||||
|
||||
RequestConstraint sourceRC = new RequestConstraint(null,
|
||||
ConstraintType.IN);
|
||||
sourceRC.setConstraintValueList(sourceNames);
|
||||
request.addConstraint("sourceName", sourceRC);
|
||||
try {
|
||||
handleURIRequest(request, siteKey, dataKey, time);
|
||||
FFMPSiteData siteData = siteDataMap.get(siteKey);
|
||||
for (String sourceName : sourceNames) {
|
||||
// This is done to ensure that the previous query time is
|
||||
// updated, even for sources with no data.
|
||||
FFMPSourceData sourceData = siteData.getSourceData(sourceName);
|
||||
Date oldPrevTime = sourceData.getPreviousUriQueryDate();
|
||||
if (oldPrevTime == null || time.before(oldPrevTime)) {
|
||||
sourceData.setPreviousUriQueryDate(time);
|
||||
}
|
||||
}
|
||||
} catch (VizException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"FFMP Can't find availble URI list for, " + sourceNames, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* handle a pre assembled database query for uris. The request is sent to
|
||||
* edex and the response is parsed to populated the uris in the siteDataMap.
|
||||
*
|
||||
*/
|
||||
private void handleURIRequest(DbQueryRequest request, String siteKey,
|
||||
String dataKey, Date time) throws VizException {
|
||||
FFMPSiteData siteData = siteDataMap.get(siteKey);
|
||||
|
||||
DbQueryResponse dbResponse = (DbQueryResponse) ThriftClient
|
||||
.sendRequest(request);
|
||||
Map<String, List<FFMPRecord>> recordsBySource = new HashMap<String, List<FFMPRecord>>();
|
||||
|
||||
for (String datauri : dbResponse.getFieldObjects("dataURI",
|
||||
String.class)) {
|
||||
FFMPRecord record = new FFMPRecord(datauri);
|
||||
List<FFMPRecord> records = recordsBySource.get(record
|
||||
.getSourceName());
|
||||
if (records == null) {
|
||||
records = new ArrayList<FFMPRecord>();
|
||||
recordsBySource.put(record.getSourceName(), records);
|
||||
}
|
||||
records.add(record);
|
||||
}
|
||||
for (Entry<String, List<FFMPRecord>> entry : recordsBySource.entrySet()) {
|
||||
String sourceName = entry.getKey();
|
||||
SourceXML sourceXml = getSourceConfig().getSource(sourceName);
|
||||
boolean isMosiac = sourceXml.isMosaic();
|
||||
FFMPSourceData sourceData = siteData.getSourceData(sourceName);
|
||||
Map<Date, List<String>> sortedUris = sourceData.getAvailableUris();
|
||||
|
||||
List<String> list = new LinkedList<String>();
|
||||
Date prevRefTime = null;
|
||||
for (FFMPRecord rec : entry.getValue()) {
|
||||
if (!isMosiac && !rec.getDataKey().equals(dataKey)) {
|
||||
continue;
|
||||
}
|
||||
Date curRefTime = rec.getDataTime().getRefTime();
|
||||
if ((prevRefTime != null) && !prevRefTime.equals(curRefTime)) {
|
||||
sortedUris.put(prevRefTime, list);
|
||||
list = new LinkedList<String>();
|
||||
}
|
||||
prevRefTime = curRefTime;
|
||||
list.add(rec.getDataURI());
|
||||
}
|
||||
|
||||
if (prevRefTime != null) {
|
||||
sortedUris.put(prevRefTime, list);
|
||||
}
|
||||
|
||||
Date prevTime = time;
|
||||
if (sourceXml.getSourceType().equals(
|
||||
SOURCE_TYPE.GUIDANCE.getSourceType())) {
|
||||
long timeOffset = sourceXml.getExpirationMinutes(siteKey)
|
||||
* TimeUtil.MILLIS_PER_MINUTE;
|
||||
prevTime = new Date(time.getTime() - timeOffset);
|
||||
}
|
||||
Date oldPrevTime = sourceData.getPreviousUriQueryDate();
|
||||
if (oldPrevTime == null || prevTime.before(oldPrevTime)) {
|
||||
sourceData.setPreviousUriQueryDate(prevTime);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the available uris back to a given time
|
||||
|
@ -557,94 +649,55 @@ public class FFMPMonitor extends ResourceMonitor {
|
|||
.get(siteKey).getSourceData(sourceName).getAvailableUris();
|
||||
Date previousQueryTime = siteDataMap.get(siteKey)
|
||||
.getSourceData(sourceName).getPreviousUriQueryDate();
|
||||
Date earliestTime = time;
|
||||
|
||||
SourceXML source = getSourceConfig().getSource(sourceName);
|
||||
|
||||
if (source.getSourceType().equals(SOURCE_TYPE.GUIDANCE.getSourceType())) {
|
||||
// Always look back for guidance types because of long expiration
|
||||
// times,
|
||||
// prevents mosaic brittleness from occurring.
|
||||
// times, prevents mosaic brittleness from occurring.
|
||||
retrieveNew = true;
|
||||
|
||||
long timeOffset = source.getExpirationMinutes(siteKey)
|
||||
* TimeUtil.MILLIS_PER_MINUTE;
|
||||
earliestTime = new Date(time.getTime() - timeOffset);
|
||||
}
|
||||
|
||||
if (retrieveNew
|
||||
|| ((time != null) && ((previousQueryTime == null) || (time
|
||||
.getTime() < previousQueryTime.getTime())))) {
|
||||
SimpleDateFormat datef = new SimpleDateFormat(datePattern);
|
||||
datef.setTimeZone(TimeZone.getTimeZone("Zulu"));
|
||||
Date earliestTime = time;
|
||||
StringBuilder query = new StringBuilder(200);
|
||||
query.append("select datauri from ffmp where wfo = '");
|
||||
query.append(getWfo());
|
||||
query.append("' and sourcename = '");
|
||||
query.append(sourceName);
|
||||
|| (time != null && (previousQueryTime == null || time
|
||||
.before(previousQueryTime)))) {
|
||||
DbQueryRequest request = new DbQueryRequest();
|
||||
request.setEntityClass(FFMPRecord.class);
|
||||
request.addRequestField("dataURI");
|
||||
request.setOrderByField("dataTime.refTime", OrderMode.DESC);
|
||||
|
||||
// GUIDANCE we save by displayName, *type*
|
||||
if (source.getSourceType().equals(
|
||||
SOURCE_TYPE.GUIDANCE.getSourceType())) {
|
||||
|
||||
long timeOffset = source.getExpirationMinutes(siteKey)
|
||||
* TimeUtil.MILLIS_PER_MINUTE;
|
||||
earliestTime = new Date(time.getTime() - timeOffset);
|
||||
}
|
||||
|
||||
query.append("' and sitekey = '");
|
||||
query.append(siteKey);
|
||||
request.addConstraint("wfo", new RequestConstraint(getWfo()));
|
||||
request.addConstraint("sourceName", new RequestConstraint(
|
||||
sourceName));
|
||||
request.addConstraint("siteKey", new RequestConstraint(siteKey));
|
||||
if (!source.isMosaic()) {
|
||||
query.append("' and datakey = '");
|
||||
query.append(dataKey);
|
||||
request.addConstraint("dataKey", new RequestConstraint(dataKey));
|
||||
|
||||
}
|
||||
|
||||
query.append("' and reftime >= '");
|
||||
query.append(datef.format(earliestTime));
|
||||
String earliestTimeString = datePattern.get().format(earliestTime);
|
||||
|
||||
if (!retrieveNew && (previousQueryTime != null)) {
|
||||
query.append("' and reftime < '");
|
||||
query.append(datef.format(previousQueryTime));
|
||||
String latestTimeString = datePattern.get().format(
|
||||
previousQueryTime);
|
||||
RequestConstraint timeRC = new RequestConstraint(null,
|
||||
ConstraintType.BETWEEN);
|
||||
timeRC.setBetweenValueList(new String[] { earliestTimeString,
|
||||
latestTimeString });
|
||||
request.addConstraint("dataTime.refTime", timeRC);
|
||||
} else {
|
||||
request.addConstraint("dataTime.refTime",
|
||||
new RequestConstraint(earliestTimeString,
|
||||
ConstraintType.GREATER_THAN_EQUALS));
|
||||
}
|
||||
|
||||
query.append("' order by reftime desc");
|
||||
|
||||
try {
|
||||
List<Object[]> results = DirectDbQuery.executeQuery(
|
||||
query.toString(), "metadata", QueryLanguage.SQL);
|
||||
List<String> list = new LinkedList<String>();
|
||||
Date prevRefTime = null;
|
||||
|
||||
for (int j = 0; j < results.size(); j++) {
|
||||
if (results.size() > 0) {
|
||||
Object[] results2 = results.get(j);
|
||||
// System.out.println("Querrying for URIs: "
|
||||
// + query.toString() + " # " + results2.length);
|
||||
|
||||
for (int i = 0; i < results2.length; i++) {
|
||||
String uri = (String) results2[0];
|
||||
FFMPRecord rec = new FFMPRecord(uri);
|
||||
Date curRefTime = rec.getDataTime().getRefTime();
|
||||
if ((prevRefTime != null)
|
||||
&& !prevRefTime.equals(curRefTime)) {
|
||||
sortedUris.put(prevRefTime, list);
|
||||
list = new LinkedList<String>();
|
||||
}
|
||||
|
||||
prevRefTime = curRefTime;
|
||||
list.add(uri);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (list != null) {
|
||||
if ((prevRefTime == null) || (list == null)) {
|
||||
statusHandler.handle(Priority.WARN,
|
||||
"Source prevTime or list = null: " + sourceName
|
||||
+ " Date: " + time);
|
||||
} else {
|
||||
sortedUris.put(prevRefTime, list);
|
||||
}
|
||||
}
|
||||
|
||||
siteDataMap.get(siteKey).getSourceData(sourceName)
|
||||
.setPreviousUriQueryDate(time);
|
||||
|
||||
handleURIRequest(request, siteKey, dataKey, time);
|
||||
} catch (VizException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"FFMP Can't find availble URI list for, " + sourceName,
|
||||
|
|
|
@ -23,9 +23,12 @@ import java.io.File;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.ffmp.FFMPAggregateRecord;
|
||||
|
@ -169,15 +172,36 @@ public class FFMPDataLoader extends Thread {
|
|||
.getSourceConfig();
|
||||
|
||||
ProductRunXML productRun = runner.getProduct(siteKey);
|
||||
ArrayList<String> qpfSources = new ArrayList<String>();
|
||||
ArrayList<SourceXML> qpfSources = new ArrayList<SourceXML>();
|
||||
String layer = config.getFFMPConfigData().getLayer();
|
||||
boolean isProductLoad = true;
|
||||
String rateURI = null;
|
||||
|
||||
if (loadType != LOADER_TYPE.GENERAL) {
|
||||
// preload all the uris except guidance. Guidance loads data
|
||||
// much further back and it is not efficient to group with the
|
||||
// rest.
|
||||
Set<String> sources = new HashSet<String>();
|
||||
sources.add(product.getRate());
|
||||
sources.add(product.getQpe());
|
||||
sources.add(product.getVirtual());
|
||||
for (String qpfType : productRun.getQpfTypes(product)) {
|
||||
for (SourceXML qpfSource : productRun.getQpfSources(
|
||||
product, qpfType)) {
|
||||
sources.add(qpfSource.getSourceName());
|
||||
}
|
||||
}
|
||||
monitor.preloadAvailableUris(siteKey, dataKey, sources,
|
||||
timeBack);
|
||||
}
|
||||
if ((loadType == LOADER_TYPE.INITIAL || loadType == LOADER_TYPE.GENERAL)
|
||||
&& !product.getRate().equals(product.getQpe())) {
|
||||
rateURI = monitor.getAvailableUri(siteKey, dataKey,
|
||||
product.getRate(), mostRecentTime);
|
||||
Map<Date, List<String>> rateURIs = monitor
|
||||
.getAvailableUris(siteKey, dataKey, product.getRate(),
|
||||
mostRecentTime);
|
||||
if (rateURIs.containsKey(mostRecentTime)) {
|
||||
rateURI = rateURIs.get(mostRecentTime).get(0);
|
||||
}
|
||||
}
|
||||
|
||||
NavigableMap<Date, List<String>> qpeURIs = monitor
|
||||
|
@ -203,7 +227,7 @@ public class FFMPDataLoader extends Thread {
|
|||
|
||||
if (qpfURIs != null && !qpfURIs.isEmpty()) {
|
||||
qpfs.add(qpfURIs);
|
||||
qpfSources.add(qpfSource.getSourceName());
|
||||
qpfSources.add(qpfSource);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -298,8 +322,7 @@ public class FFMPDataLoader extends Thread {
|
|||
|
||||
if (loadType == LOADER_TYPE.INITIAL) {
|
||||
|
||||
SourceXML source = sourceConfig
|
||||
.getSource(qpfSources.get(i));
|
||||
SourceXML source = qpfSources.get(i);
|
||||
|
||||
String pdataKey = findQPFHomeDataKey(source);
|
||||
qpfCache = readAggregateRecord(source, pdataKey, wfo);
|
||||
|
|
Loading…
Add table
Reference in a new issue