Issue #2681: Update stats aggregate to work in discrete time chunks.

Change-Id: I999c6676d5b2464eb8f5467b5155f842c365798e

Former-commit-id: 23a71c7f13 [formerly 23a71c7f13 [formerly 1d18f8f5b3fb1986f197c27b765dabec84343dac]]
Former-commit-id: 78ab2738d0
Former-commit-id: 2b86613c4f
This commit is contained in:
Richard Peter 2014-04-18 16:39:45 -05:00
parent db4bb7b6c6
commit 84c906ccb2
2 changed files with 82 additions and 28 deletions

View file

@ -78,6 +78,7 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
* Mar 27, 2013 1802 bphillip Made jaxb manager static and changed visibility of a method
* May 22, 2013 1917 rjpeter Added ability to save raw and aggregate stats, to reclaimSpace every scan call,
* and to not pretty print xml grouping information.
* Apr 18, 2014 2681 rjpeter Updated scan to process in distinct chunks of time.
* </pre>
*
* @author jsanchez
@ -243,35 +244,65 @@ public class AggregateManager {
String type = entry.getKey();
StatisticsEventConfig event = entry.getValue();
List<StatsRecord> records = null;
Calendar minTime = statsRecordDao.retrieveMinTime(type);
do {
// retrieve stats in blocks of 1000
records = statsRecordDao.retrieveRecords(timeToProcess, type,
2000);
if ((minTime != null) && minTime.before(timeToProcess)) {
/*
* process in minute chunks to avoid overwhelming the database
* and having consistent results if stats is down for a period
* of time.
*/
Calendar maxTime = (Calendar) minTime.clone();
maxTime.add(Calendar.MINUTE, 1);
if (!CollectionUtil.isNullOrEmpty(records)) {
// sort events into time buckets
Map<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMap = sort(
event, records);
// not checking before since we want before or equal to
while (!maxTime.after(timeToProcess) && maxTime.after(minTime)) {
records = statsRecordDao.retrieveRecords(type, minTime,
maxTime);
for (Map.Entry<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMapEntry : timeMap
.entrySet()) {
aggregate(event, timeMapEntry.getKey(),
timeMapEntry.getValue());
}
if (!CollectionUtil.isNullOrEmpty(records)) {
// sort events into time buckets
Map<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMap = sort(
event, records);
try {
statsRecordDao.deleteAll(records);
} catch (Exception e) {
statusHandler.error("Error deleting stat records", e);
}
for (Map.Entry<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMapEntry : timeMap
.entrySet()) {
aggregate(event, timeMapEntry.getKey(),
timeMapEntry.getValue());
}
count += records.size();
if (event.getRawOfflineRetentionDays() >= 0) {
offline.writeStatsToDisk(event, timeMap);
try {
statsRecordDao.deleteAll(records);
} catch (Exception e) {
statusHandler.error("Error deleting stat records",
e);
}
count += records.size();
if (event.getRawOfflineRetentionDays() >= 0) {
offline.writeStatsToDisk(event, timeMap);
}
// increment to next interval
minTime.add(Calendar.MINUTE, 1);
maxTime.add(Calendar.MINUTE, 1);
} else {
maxTime.add(Calendar.MINUTE, 1);
// check if at end of interval
if (maxTime.before(timeToProcess)) {
// no records found in interval, find next interval
minTime = statsRecordDao.retrieveMinTime(type);
if (minTime == null) {
break;
}
maxTime.setTimeInMillis(minTime.getTimeInMillis());
maxTime.add(Calendar.MINUTE, 1);
}
}
}
} while (!CollectionUtil.isNullOrEmpty(records));
}
}
statsRecordDao.reclaimSpace();

View file

@ -42,6 +42,7 @@ import com.raytheon.uf.edex.database.dao.SessionManagedDao;
* Aug 21, 2012 jsanchez Initial creation
* Mar 18, 2013 1082 bphillip Modified to extend sessionmanagedDao and use spring injection
* May 22, 2013 1917 rjpeter Added reclaimSpace.
* Apr 18, 2014 2681 rjpeter Added retrieveMinTime.
* </pre>
*
* @author jsanchez
@ -56,7 +57,29 @@ public class StatsDao extends SessionManagedDao<Integer, StatsRecord> {
}
/**
* Retrieves stat records that has a date before the limit.
* Retrieves the earliest time in the stats table for a given data type.
*
* @param eventType
* @return
* @throws DataAccessLayerException
*/
public Calendar retrieveMinTime(String eventType)
throws DataAccessLayerException {
String hql = "select min(rec.date) from StatsRecord rec where rec.eventType = :eventType";
List<Object> results = this
.executeHQLQuery(hql, "eventType", eventType);
if ((results != null) && !results.isEmpty()) {
Object time = results.get(0);
if (time != null) {
return (Calendar) time;
}
}
return null;
}
/**
* Retrieves stat records that has a date in the time range.
*
* @param limit
* @param eventType
@ -66,11 +89,11 @@ public class StatsDao extends SessionManagedDao<Integer, StatsRecord> {
* size 0 will be returned.
* @throws DataAccessLayerException
*/
public List<StatsRecord> retrieveRecords(Calendar limit, String eventType,
int maxResults) throws DataAccessLayerException {
String hql = "from StatsRecord rec where rec.eventType = :eventType and rec.date < :date order by rec.date asc";
return this.query(hql, maxResults, "eventType", eventType, "date",
limit);
public List<StatsRecord> retrieveRecords(String eventType,
Calendar minTime, Calendar maxTime) throws DataAccessLayerException {
String hql = "from StatsRecord rec where rec.eventType = :eventType and rec.date >= :minDate and rec.date < :maxDate order by rec.date asc";
return this.query(hql, "eventType", eventType, "minDate", minTime,
"maxDate", maxTime);
}
@Override