Merge "Issue #2681: Update stats aggregate to work in discrete time chunks." into development

Former-commit-id: be9fffd922 [formerly 68cb07ffdb] [formerly 76442c8d0c] [formerly 0bb34b225f [formerly 76442c8d0c [formerly 4ec6b3cfc640600335fbeb3f3e5377ff62b2b5a9]]]
Former-commit-id: 0bb34b225f
Former-commit-id: 8eed7c669eaef414bb0341039c14313355a2da4a [formerly 5ae5323d74]
Former-commit-id: eb7d29b84c
This commit is contained in:
Richard Peter 2014-04-21 13:57:28 -05:00 committed by Gerrit Code Review
commit c74c860742
2 changed files with 82 additions and 28 deletions

View file

@ -78,6 +78,7 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
* Mar 27, 2013 1802 bphillip Made jaxb manager static and changed visibility of a method
* May 22, 2013 1917 rjpeter Added ability to save raw and aggregate stats, to reclaimSpace every scan call,
* and to not pretty print xml grouping information.
* Apr 18, 2014 2681 rjpeter Updated scan to process in distinct chunks of time.
* </pre>
*
* @author jsanchez
@ -243,35 +244,65 @@ public class AggregateManager {
String type = entry.getKey();
StatisticsEventConfig event = entry.getValue();
List<StatsRecord> records = null;
Calendar minTime = statsRecordDao.retrieveMinTime(type);
do {
// retrieve stats in blocks of 1000
records = statsRecordDao.retrieveRecords(timeToProcess, type,
2000);
if ((minTime != null) && minTime.before(timeToProcess)) {
/*
* process in minute chunks to avoid overwhelming the database
* and having consistent results if stats is down for a period
* of time.
*/
Calendar maxTime = (Calendar) minTime.clone();
maxTime.add(Calendar.MINUTE, 1);
if (!CollectionUtil.isNullOrEmpty(records)) {
// sort events into time buckets
Map<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMap = sort(
event, records);
// not checking before since we want before or equal to
while (!maxTime.after(timeToProcess) && maxTime.after(minTime)) {
records = statsRecordDao.retrieveRecords(type, minTime,
maxTime);
for (Map.Entry<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMapEntry : timeMap
.entrySet()) {
aggregate(event, timeMapEntry.getKey(),
timeMapEntry.getValue());
}
if (!CollectionUtil.isNullOrEmpty(records)) {
// sort events into time buckets
Map<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMap = sort(
event, records);
try {
statsRecordDao.deleteAll(records);
} catch (Exception e) {
statusHandler.error("Error deleting stat records", e);
}
for (Map.Entry<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMapEntry : timeMap
.entrySet()) {
aggregate(event, timeMapEntry.getKey(),
timeMapEntry.getValue());
}
count += records.size();
if (event.getRawOfflineRetentionDays() >= 0) {
offline.writeStatsToDisk(event, timeMap);
try {
statsRecordDao.deleteAll(records);
} catch (Exception e) {
statusHandler.error("Error deleting stat records",
e);
}
count += records.size();
if (event.getRawOfflineRetentionDays() >= 0) {
offline.writeStatsToDisk(event, timeMap);
}
// increment to next interval
minTime.add(Calendar.MINUTE, 1);
maxTime.add(Calendar.MINUTE, 1);
} else {
maxTime.add(Calendar.MINUTE, 1);
// check if at end of interval
if (maxTime.before(timeToProcess)) {
// no records found in interval, find next interval
minTime = statsRecordDao.retrieveMinTime(type);
if (minTime == null) {
break;
}
maxTime.setTimeInMillis(minTime.getTimeInMillis());
maxTime.add(Calendar.MINUTE, 1);
}
}
}
} while (!CollectionUtil.isNullOrEmpty(records));
}
}
statsRecordDao.reclaimSpace();

View file

@ -42,6 +42,7 @@ import com.raytheon.uf.edex.database.dao.SessionManagedDao;
* Aug 21, 2012 jsanchez Initial creation
* Mar 18, 2013 1082 bphillip Modified to extend sessionmanagedDao and use spring injection
* May 22, 2013 1917 rjpeter Added reclaimSpace.
* Apr 18, 2014 2681 rjpeter Added retrieveMinTime.
* </pre>
*
* @author jsanchez
@ -56,7 +57,29 @@ public class StatsDao extends SessionManagedDao<Integer, StatsRecord> {
}
/**
* Retrieves stat records that has a date before the limit.
* Retrieves the earliest time in the stats table for a given data type.
*
* @param eventType
* @return
* @throws DataAccessLayerException
*/
public Calendar retrieveMinTime(String eventType)
throws DataAccessLayerException {
String hql = "select min(rec.date) from StatsRecord rec where rec.eventType = :eventType";
List<Object> results = this
.executeHQLQuery(hql, "eventType", eventType);
if ((results != null) && !results.isEmpty()) {
Object time = results.get(0);
if (time != null) {
return (Calendar) time;
}
}
return null;
}
/**
* Retrieves stat records that has a date in the time range.
*
* @param limit
* @param eventType
@ -66,11 +89,11 @@ public class StatsDao extends SessionManagedDao<Integer, StatsRecord> {
* size 0 will be returned.
* @throws DataAccessLayerException
*/
public List<StatsRecord> retrieveRecords(Calendar limit, String eventType,
int maxResults) throws DataAccessLayerException {
String hql = "from StatsRecord rec where rec.eventType = :eventType and rec.date < :date order by rec.date asc";
return this.query(hql, maxResults, "eventType", eventType, "date",
limit);
public List<StatsRecord> retrieveRecords(String eventType,
Calendar minTime, Calendar maxTime) throws DataAccessLayerException {
String hql = "from StatsRecord rec where rec.eventType = :eventType and rec.date >= :minDate and rec.date < :maxDate order by rec.date asc";
return this.query(hql, "eventType", eventType, "minDate", minTime,
"maxDate", maxTime);
}
@Override