Issue #1350: Fix stats aggregation

Change-Id: I652db6f6f5bc0cab8fc9e85969bf57f1c2382da3

Former-commit-id: 71574f210d [formerly 94df67d354] [formerly 71574f210d [formerly 94df67d354] [formerly 2b124db62a [formerly b06d51266d600e0b04f7fe10bc36e312a2275428]]]
Former-commit-id: 2b124db62a
Former-commit-id: 99f19186ef [formerly 5a45254db0]
Former-commit-id: 9a51a3d3b2
This commit is contained in:
Richard Peter 2012-11-29 14:30:03 -06:00
parent 9112fd0b14
commit 55ae5a07cf
13 changed files with 531 additions and 413 deletions

View file

@ -34,6 +34,7 @@ import com.raytheon.uf.common.localization.ILocalizationAdapter.ListResponse;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
import com.raytheon.uf.common.localization.exception.LocalizationException;
import com.raytheon.uf.common.localization.exception.LocalizationOpFailedException;
import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
@ -165,7 +166,7 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
private LocalizationLevel protectedLevel;
/** File changed observers */
private Set<ILocalizationFileObserver> observers = new HashSet<ILocalizationFileObserver>();
private final Set<ILocalizationFileObserver> observers = new HashSet<ILocalizationFileObserver>();
/** Flag to set if file has been requested */
protected boolean fileRequested = false;
@ -187,8 +188,8 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
* @return
*/
boolean isNull() {
return adapter == null && path == null && context == null
&& file == null;
return (adapter == null) && (path == null) && (context == null)
&& (file == null);
}
LocalizationFile(ILocalizationAdapter adapter, LocalizationContext context,
@ -279,7 +280,7 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
adapter.retrieve(this);
}
if (isDirectory == false && !file.exists()) {
if ((isDirectory == false) && !file.exists()) {
try {
file.getParentFile().mkdirs();
} catch (Throwable t) {
@ -352,8 +353,8 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
// Read in the bytes
int offset = 0;
int numRead = 0;
while (offset < rval.length
&& (numRead = is.read(rval, offset, rval.length - offset)) >= 0) {
while ((offset < rval.length)
&& ((numRead = is.read(rval, offset, rval.length - offset)) >= 0)) {
offset += numRead;
}
@ -548,7 +549,7 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
* @return true if the file exists
*/
public boolean exists() {
return isNull() == false && adapter.exists(this);
return (isNull() == false) && adapter.exists(this);
}
/**
@ -596,6 +597,44 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
}
}
/**
* Returns the object version of this jaxb serialized file. Returns null if
* the file does not exist or is empty.
*
* @param <T>
* @param resultClass
* @param manager
* @return
* @throws LocalizationException
*/
public <T> T jaxbUnmarshal(Class<T> resultClass, JAXBManager manager)
throws LocalizationException {
File f = getFile();
if (f.exists() && (f.length() > 0)) {
InputStream is = null;
try {
is = openInputStream();
T object = resultClass.cast(manager
.jaxbUnmarshalFromInputStream(is));
return object;
} catch (Exception e) {
throw new LocalizationException("Could not unmarshal file "
+ file.getName(), e);
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
statusHandler.handle(Priority.WARN,
"Failed to close input stream for file", e);
}
}
}
}
return null;
}
@Override
public String toString() {
return context + IPathManager.SEPARATOR + path;

View file

@ -19,6 +19,7 @@
**/
package com.raytheon.uf.common.stats.xml;
import java.lang.reflect.Method;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
@ -28,6 +29,7 @@ import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElements;
import javax.xml.bind.annotation.XmlRootElement;
import com.raytheon.uf.common.event.Event;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@ -72,6 +74,12 @@ public class StatisticsEvent {
@DynamicSerializeElement
private List<StatisticsAggregate> aggregateList;
private Class<? extends Event> typeClass = null;
private List<Method> groupByMethods = null;
private List<Method> aggregateMethods = null;
/**
* @return the aggregateList
*/
@ -146,4 +154,29 @@ public class StatisticsEvent {
public void setType(String type) {
this.type = type;
}
public Class<? extends Event> getTypeClass() {
return typeClass;
}
public void setTypeClass(Class<? extends Event> typeClass) {
this.typeClass = typeClass;
}
public List<Method> getGroupByMethods() {
return groupByMethods;
}
public void setGroupByMethods(List<Method> groupByMethods) {
this.groupByMethods = groupByMethods;
}
public List<Method> getAggregateMethods() {
return aggregateMethods;
}
public void setAggregateMethods(List<Method> aggregateMethods) {
this.aggregateMethods = aggregateMethods;
}
}

View file

@ -83,7 +83,7 @@ public final class ReflectionUtil {
}
}
public static Object getter(Object object, String name)
public static Method getGetterMethod(Class<?> clazz, String name)
throws ReflectionException {
// Assume camel cased names - capitalize first letter...
String method = Character.toUpperCase(name.charAt(0))
@ -92,13 +92,23 @@ public final class ReflectionUtil {
Method m;
try {
// Try common 'get' first...
m = object.getClass().getMethod("get" + method);
m = clazz.getMethod("get" + method);
} catch (NoSuchMethodException e) {
// Try 'is' as a prefix
m = object.getClass().getMethod("is" + method);
m = clazz.getMethod("is" + method);
}
return m.invoke(object, (Object[]) null);
return m;
} catch (Exception e) {
throw new ReflectionException(e);
}
}
public static Object getter(Object object, String name)
throws ReflectionException {
try {
return getGetterMethod(object.getClass(), name).invoke(object,
(Object[]) null);
} catch (Exception e) {
throw new ReflectionException(e);
}

View file

@ -53,7 +53,7 @@
<!-- schedule the timer to purge outgoing directory -->
<route id="purgeOutgoingScheduled">
<from uri="purgeOutgoingCron" />
<from ref="purgeOutgoingCron" />
<bean ref="purgeOutgoing" method="purge" />
</route>
@ -67,7 +67,7 @@
<!-- Purge on Scheduled timer -->
<route id="purgeScheduled">
<from uri="purgeCron" />
<from ref="purgeCron" />
<doTry>
<bean ref="purgeManager" method="executePurge" />
<doCatch>
@ -80,7 +80,7 @@
<!-- schedule the timer to purge log directories -->
<route id="purgeLogScheduled">
<from uri="purgeLogsCron" />
<from ref="purgeLogsCron" />
<bean ref="purgeLogs" method="purge" />
</route>

View file

@ -16,6 +16,7 @@ Require-Bundle: com.raytheon.uf.common.serialization;bundle-version="1.12.1174",
com.raytheon.uf.common.serialization.comm;bundle-version="1.12.1174",
com.raytheon.uf.common.stats;bundle-version="1.0.0",
com.raytheon.uf.common.dataplugin;bundle-version="1.12.1174",
com.raytheon.uf.common.util,
org.springframework;bundle-version="2.5.6",
com.raytheon.uf.edex.core;bundle-version="1.12.1174",
com.raytheon.uf.common.status;bundle-version="1.12.1174",

View file

@ -12,7 +12,6 @@
<bean id="aggregateManager" class="com.raytheon.uf.edex.stats.AggregateManager">
<constructor-arg value="${stats.period}"/>
<constructor-arg value="${stats.scanInterval}"/>
</bean>
<bean id="edexStatsRegistered" factory-bean="clusteredCamelContextMgr"
@ -24,9 +23,10 @@
xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler"
autoStartup="false">
<endpoint id="statsScanTimer" uri="timer://scanStats?period=${stats.scanInterval}m"/>
<route id="statsTableScan">
<from uri="timer://scanStats?period=15m" />
<from ref="statsScanTimer" />
<doTry>
<bean ref="statsPurge" method="purgeAggregates"/>
<bean ref="aggregateManager" method="scan"/>

View file

@ -1,4 +1,4 @@
# scan interval of stats table in minutes
stats.scanInterval=15
stats.scanInterval=2
# bucket interval or period of when to aggregate in minutes
stats.period=2
stats.period=5

View file

@ -20,31 +20,30 @@
package com.raytheon.uf.edex.stats;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TimeZone;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.raytheon.uf.common.event.Event;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.stats.AggregateRecord;
import com.raytheon.uf.common.stats.StatsRecord;
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
import com.raytheon.uf.common.stats.xml.StatisticsGroup;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.time.TimeRange;
import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.stats.dao.AggregateRecordDao;
import com.raytheon.uf.edex.stats.dao.StatsDao;
import com.raytheon.uf.edex.stats.handler.StatsHandler;
import com.raytheon.uf.edex.stats.util.ConfigLoader;
/**
@ -60,119 +59,67 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Stored the aggregate buckets in the db.
* Nov 07, 2012 1317 mpduff Updated Configuration Files.
*
* Nov 28, 2012 1350 rjpeter Simplied aggregation and added aggregation with current db aggregate records.
* </pre>
*
* @author jsanchez
*
*/
public class AggregateManager {
private class TimeRangeKey extends TimeRange {
private static final long serialVersionUID = 4603487307433273159L;
public TimeRangeKey(Calendar cal1, Calendar cal2) {
super(cal1, cal2);
}
@Override
public boolean equals(Object o) {
if (o != null && o instanceof TimeRange) {
TimeRange other = (TimeRange) o;
return getStart().equals(other.getStart())
&& getEnd().equals(other.getEnd());
}
return false;
}
@Override
public int hashCode() {
return 1;
}
@Override
public String toString() {
return super.toString();
}
}
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(AggregateManager.class);
/** In minutes */
private int bucketInterval;
/** In minutes */
private int scanInterval;
/** default value */
private static final int defaultBucketInterval = 5;
/** default value */
private static final int defaultScanInterval = 15;
/** loads localized copies of the statsConfig */
private final ConfigLoader configLoader;
private final CoreDao aggregateRecordDao = new CoreDao(DaoConfig.forClass(
"metadata", AggregateRecord.class));
public AggregateManager(String bucketInterval, String scanInterval)
throws Exception {
configLoader = new ConfigLoader();
validateIntervals(bucketInterval, scanInterval);
configLoader.load();
StatsHandler.setValidEventTypes(configLoader.getConfigurations());
public AggregateManager(String bucketInterval) {
validateIntervals(bucketInterval);
}
/**
* Performs the aggregation based on the statsConfig file.
* Aggregates the grouped events and then aggregates them with any matching
* range in DB and stores the updated aggregate.
*
* @param key
* @param data
* @param dao
* @param statsEvent
* @param timeRange
* @param groupedEvents
*/
private void aggregate(StatisticsEvent statsEvent, TimeRange timeRange,
List<Event> data) {
Calendar start = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
private void aggregate(AggregateRecordDao dao, StatisticsEvent statsEvent,
TimeRange timeRange, Multimap<String, Event> groupedEvents) {
Calendar start = TimeUtil.newCalendar(TimeZone.getTimeZone("GMT"));
start.setTime(timeRange.getStart());
Calendar end = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
Calendar end = TimeUtil.newCalendar(TimeZone.getTimeZone("GMT"));
end.setTime(timeRange.getEnd());
// collect grouping names from stats config
List<String> groupByColumns = new ArrayList<String>();
for (StatisticsGroup groupBy : statsEvent.getGroupList()) {
String column = groupBy.getName();
groupByColumns.add(column);
}
// breaks data into groups
Map<String, List<Event>> map = divideIntoGroups(data, groupByColumns);
// perform aggregate functions on the grouped data
for (String groupKey : map.keySet()) {
for (String groupKey : groupedEvents.keySet()) {
Collection<Event> groupData = groupedEvents.get(groupKey);
Iterator<Method> aggrMethodIter = statsEvent.getAggregateMethods()
.iterator();
Iterator<StatisticsAggregate> statAggrIter = statsEvent
.getAggregateList().iterator();
int count = groupData.size();
while (aggrMethodIter.hasNext() && statAggrIter.hasNext()) {
String field = statAggrIter.next().getField();
Method m = aggrMethodIter.next();
List<Event> groupData = map.get(groupKey);
for (StatisticsAggregate aggregate : statsEvent.getAggregateList()) {
String field = aggregate.getField();
try {
double[] values = new double[groupData.size()];
String methodName = getterMethodName(field);
for (int i = 0; i < groupData.size(); i++) {
Object obj = groupData.get(i);
Class<?> clazz = obj.getClass();
Method m = clazz.getMethod(methodName, new Class<?>[0]);
Number number = (Number) m.invoke(obj, new Object[0]);
values[i] = number.doubleValue();
}
double count = values.length;
double max = 0;
double max = -Double.MAX_VALUE;
double min = Double.MAX_VALUE;
double sum = 0;
for (double value : values) {
for (Event event : groupData) {
Number number = (Number) m.invoke(event, new Object[0]);
double value = number.doubleValue();
sum += value;
if (value > max) {
max = value;
@ -189,7 +136,7 @@ public class AggregateManager {
record.setMin(min);
record.setMax(max);
record.setCount(count);
aggregateRecordDao.persist(record);
dao.mergeRecord(record);
} catch (Exception e) {
statusHandler.error("Unable to aggregate '" + field + "'",
e);
@ -206,73 +153,13 @@ public class AggregateManager {
* @param date
* @return
*/
private TimeRangeKey createTimeRangeKey(Calendar date) {
private TimeRange createTimeRange(Calendar date) {
Calendar start = getBucketStartTime(date);
Calendar end = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
end.setTimeInMillis(start.getTimeInMillis());
end.add(Calendar.MINUTE, bucketInterval);
TimeRangeKey timeRangeKey = new TimeRangeKey(start, end);
return timeRangeKey;
}
/**
* Breaks the list of data into groups based on groupByColumns. The key is a
* concatenation of the column values (i.e. datatype.username). This method
* can group data to n-number of levels.
*
* @param data
* @param groupByColumns
* @return
*/
private Map<String, List<Event>> divideIntoGroups(List<Event> data,
List<String> groupByColumns) {
Map<String, List<Event>> map = new HashMap<String, List<Event>>();
map.put("", data);
for (String column : groupByColumns) {
List<Map<String, List<Event>>> listOfMaps = new ArrayList<Map<String, List<Event>>>();
for (String parent : map.keySet()) {
List<Event> list = map.get(parent);
listOfMaps.add(group(list, column, parent));
}
map.clear();
// replace map with grouped data
for (Map<String, List<Event>> m : listOfMaps) {
for (String k : m.keySet()) {
map.put(k, m.get(k));
}
}
}
return map;
}
/**
* Extracts the events from the stats records.
*
* @param records
* @return
*/
private List<Event> extractEvents(List<StatsRecord> records) {
List<Event> eventsList = new ArrayList<Event>(records.size());
for (StatsRecord record : records) {
try {
Event event = SerializationUtil.transformFromThrift(
Event.class, record.getEvent());
eventsList.add(event);
} catch (SerializationException e) {
statusHandler
.error("Error trying to transform event. Aggregation may be inaccurate. ",
e);
}
}
return eventsList;
return new TimeRange(start, end);
}
/**
@ -302,155 +189,133 @@ public class AggregateManager {
return start;
}
/**
* Returns the name of the getter method for the parameter
*
* @param parameter
* @return
*/
private String getterMethodName(String parameter) {
return "get" + parameter.substring(0, 1).toUpperCase()
+ parameter.substring(1);
}
/**
* Helper method to group data to one level.
*
* @param data
* @param column
* @param parent
* @return
*/
private Map<String, List<Event>> group(List<Event> data, String column,
String parent) {
Map<String, List<Event>> map = new HashMap<String, List<Event>>();
String methodName = getterMethodName(column);
for (Event rec : data) {
try {
Class<?> clazz = rec.getClass();
Method m = clazz.getMethod(methodName, new Class<?>[0]);
String value = column + ":"
+ String.valueOf(m.invoke(rec, new Object[0]));
if (parent.length() > 0) {
value = parent + "-" + value;
}
List<Event> list = map.get(value);
if (list == null) {
list = new ArrayList<Event>();
}
list.add(rec);
map.put(value, list);
} catch (Exception e) {
statusHandler.error("Error creating groups", e);
}
}
return map;
}
/**
* Retrieve StatRecords from the metadata.event.stats table. This method
* does not retrieve records of the current bucket.
*/
private void retrieveStatRecords(StatsDao statsRecordDao,
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets)
throws Exception {
Calendar current = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
for (StatisticsConfig statsConfig : configLoader.getConfigurations()) {
for (StatisticsEvent event : statsConfig.getEvents()) {
String eventType = event.getType();
// Does not retrieve stat records of current bucket.
// this method should always return a valid array.
StatsRecord[] records = statsRecordDao.retrieveRecords(
getBucketStartTime(current), eventType);
sort(eventType, records, aggregateBuckets);
}
}
}
/**
* Scans the stats table to be stored in buckets and aggregate if necessary.
*/
public void scan() throws Exception {
long t0 = System.currentTimeMillis();
ConfigLoader configLoader = ConfigLoader.getInstance();
StatsDao statsRecordDao = new StatsDao();
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets = new HashMap<String, Map<TimeRangeKey, List<StatsRecord>>>();
AggregateRecordDao aggrRecordDao = new AggregateRecordDao();
// retrieves records and sorts in buckets
retrieveStatRecords(statsRecordDao, aggregateBuckets);
Map<String, StatisticsEvent> statsMap = configLoader.getTypeView();
// loops through map to aggregate buckets
for (StatisticsConfig statsConfig : configLoader.getConfigurations()) {
for (StatisticsEvent event : statsConfig.getEvents()) {
String eventType = event.getType();
// latest time to pull
Calendar timeToProcess = Calendar.getInstance(TimeZone
.getTimeZone("GMT"));
int count = 0;
Map<TimeRangeKey, List<StatsRecord>> map = aggregateBuckets
.get(eventType);
// map should never be null, since it will be set in the 'sort'
// method.
for (Iterator<Map.Entry<TimeRangeKey, List<StatsRecord>>> iter = map
.entrySet().iterator(); iter.hasNext();) {
Entry<TimeRangeKey, List<StatsRecord>> element = iter
.next();
TimeRangeKey tr = element.getKey();
List<StatsRecord> records = element.getValue();
if (!records.isEmpty()) {
List<Event> data = extractEvents(records);
aggregate(event, tr, data);
try {
statsRecordDao.deleteAll(records);
} catch (Exception e) {
statusHandler.error("Error deleting stat records",
e);
}
// process the events by type
for (Map.Entry<String, StatisticsEvent> entry : statsMap.entrySet()) {
String type = entry.getKey();
StatisticsEvent event = entry.getValue();
List<StatsRecord> records = null;
do {
// retrieve stats in blocks of 1000
records = statsRecordDao.retrieveRecords(timeToProcess, type,
2000);
if (!CollectionUtil.isNullOrEmpty(records)) {
// sort events into time buckets
Map<TimeRange, Multimap<String, Event>> timeMap = sort(
event, records);
for (Map.Entry<TimeRange, Multimap<String, Event>> timeMapEntry : timeMap
.entrySet()) {
aggregate(aggrRecordDao, event, timeMapEntry.getKey(),
timeMapEntry.getValue());
}
iter.remove();
try {
statsRecordDao.deleteAll(records);
} catch (Exception e) {
statusHandler.error("Error deleting stat records", e);
}
count += records.size();
}
}
} while (!CollectionUtil.isNullOrEmpty(records));
}
long t1 = System.currentTimeMillis();
statusHandler.info("Aggregated " + count + " stat events in "
+ (t1 - t0) + " ms");
}
/**
* Stores the results into proper aggregate buckets. This method assumes
* that the records are in date order.
* Sorts the records into time buckets and groups by the underlying Event.
*
* @param events
*/
private void sort(String eventType, StatsRecord[] records,
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets)
throws Exception {
Map<TimeRangeKey, List<StatsRecord>> map = aggregateBuckets
.get(eventType);
if (map == null) {
map = new HashMap<TimeRangeKey, List<StatsRecord>>();
aggregateBuckets.put(eventType, map);
}
TimeRangeKey timeRange = null;
for (StatsRecord record : records) {
if (timeRange == null
|| !timeRange.contains(record.getDate().getTime())) {
// Create bucket based on stats record date
timeRange = createTimeRangeKey(record.getDate());
}
List<StatsRecord> bucketList = map.get(timeRange);
if (bucketList == null) {
bucketList = new ArrayList<StatsRecord>();
map.put(timeRange, bucketList);
}
bucketList.add(record);
}
}
/**
* Tests if the bucket interval and the scan interval are valid values. If
* values are invalid then values will be set to default values.
*
* @param bucketInt
* @param scanInt
* @param records
* @return
*/
private void validateIntervals(String bucketInt, String scanInt) {
private Map<TimeRange, Multimap<String, Event>> sort(
StatisticsEvent statEvent, List<StatsRecord> records) {
Map<TimeRange, Multimap<String, Event>> rval = new HashMap<TimeRange, Multimap<String, Event>>();
TimeRange timeRange = null;
Multimap<String, Event> eventsByGroup = null;
final Object[] EMPTY_OBJ_ARR = new Object[0];
StringBuilder group = new StringBuilder();
for (StatsRecord record : records) {
if ((timeRange == null)
|| !timeRange.contains(record.getDate().getTime())) {
// Create bucket based on stats record date
timeRange = createTimeRange(record.getDate());
eventsByGroup = rval.get(timeRange);
if (eventsByGroup == null) {
eventsByGroup = ArrayListMultimap.create();
rval.put(timeRange, eventsByGroup);
}
}
try {
// get underlying event
Event event = SerializationUtil.transformFromThrift(
Event.class, record.getEvent());
// determine group
boolean addDelim = false;
Iterator<Method> gMethodIter = statEvent.getGroupByMethods()
.iterator();
Iterator<StatisticsGroup> gFieldNameIter = statEvent
.getGroupList().iterator();
group.setLength(0);
while (gMethodIter.hasNext() && gFieldNameIter.hasNext()) {
Method m = gMethodIter.next();
String field = gFieldNameIter.next().getName();
String gVal = String
.valueOf(m.invoke(event, EMPTY_OBJ_ARR));
if (addDelim) {
group.append('-');
} else {
addDelim = true;
}
group.append(field).append(':').append(gVal);
}
eventsByGroup.put(group.toString(), event);
} catch (Exception e) {
statusHandler
.error("Error processing event. Aggregation may be inaccurate. ",
e);
}
}
return rval;
}
/**
* Tests if the bucket interval is a valid value. If value is invalid then
* value will be set to default value.
*
* @param bucketInt
* @return
*/
private void validateIntervals(String bucketInt) {
try {
bucketInterval = Integer.parseInt(bucketInt);
} catch (NumberFormatException e) {
@ -460,26 +325,6 @@ public class AggregateManager {
+ defaultBucketInterval + "'");
}
try {
scanInterval = Integer.parseInt(scanInt);
} catch (NumberFormatException e) {
scanInterval = defaultScanInterval;
statusHandler.info("'" + scanInt
+ "' is not a valid scan interval value. Setting to '"
+ defaultScanInterval + "'");
}
if (scanInterval < bucketInterval) {
scanInterval = defaultBucketInterval;
bucketInterval = defaultBucketInterval;
statusHandler
.info("The bucket interval can not be greater than the scan interval. Setting scan interval to '"
+ defaultBucketInterval
+ "' and bucket interval to '"
+ bucketInterval
+ "'");
}
int incrementsWithinHour = bucketInterval;
// checks if period is larger than 60 minutes
if (bucketInterval > 60) {
@ -492,5 +337,4 @@ public class AggregateManager {
+ bucketInterval + "'");
}
}
}

View file

@ -0,0 +1,64 @@
package com.raytheon.uf.edex.stats.dao;
import java.util.List;
import com.raytheon.uf.common.dataquery.db.QueryParam.QueryOperand;
import com.raytheon.uf.common.stats.AggregateRecord;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
public class AggregateRecordDao extends CoreDao {
/**
* Creates a new data access object
*/
public AggregateRecordDao() {
super(DaoConfig.forClass("metadata", AggregateRecord.class));
}
/**
* Retrieves stat records that has a date before the limit.
*
* @param limit
* @param eventType
* @param maxResults
* if greater than 0 will limit database results to maxResults
* @return an array of stat records. If an error occurs, then an array of
* size 0 will be returned.
* @throws DataAccessLayerException
*/
@SuppressWarnings("unchecked")
public void mergeRecord(AggregateRecord newRecord)
throws DataAccessLayerException {
DatabaseQuery query = new DatabaseQuery(AggregateRecord.class);
query.addQueryParam("eventType", newRecord.getEventType(),
QueryOperand.EQUALS);
query.addQueryParam("field", newRecord.getField(), QueryOperand.EQUALS);
query.addQueryParam("grouping", newRecord.getGrouping(),
QueryOperand.EQUALS);
query.addQueryParam("startDate", newRecord.getStartDate(),
QueryOperand.EQUALS);
query.addQueryParam("endDate", newRecord.getEndDate(),
QueryOperand.EQUALS);
List<AggregateRecord> results = (List<AggregateRecord>) queryByCriteria(query);
if (!CollectionUtil.isNullOrEmpty(results)) {
// shouldn't be able to get multiple results, just merge with first
// and update
AggregateRecord prevRecord = results.get(0);
prevRecord.setCount(prevRecord.getCount() + newRecord.getCount());
prevRecord.setSum(prevRecord.getSum() + newRecord.getSum());
if (newRecord.getMin() < prevRecord.getMin()) {
prevRecord.setMin(newRecord.getMin());
}
if (newRecord.getMax() > prevRecord.getMax()) {
prevRecord.setMax(newRecord.getMax());
}
update(prevRecord);
} else {
persist(newRecord);
}
}
}

View file

@ -5,17 +5,12 @@ import java.util.List;
import com.raytheon.uf.common.dataquery.db.QueryParam.QueryOperand;
import com.raytheon.uf.common.stats.StatsRecord;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
public class StatsDao extends CoreDao {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(StatsDao.class);
/**
* Creates a new data access object
*/
@ -28,30 +23,24 @@ public class StatsDao extends CoreDao {
*
* @param limit
* @param eventType
* @param maxResults
* if greater than 0 will limit database results to maxResults
* @return an array of stat records. If an error occurs, then an array of
* size 0 will be returned.
* @throws DataAccessLayerException
*/
public StatsRecord[] retrieveRecords(Calendar limit, String eventType) {
@SuppressWarnings("unchecked")
public List<StatsRecord> retrieveRecords(Calendar limit, String eventType,
int maxResults) throws DataAccessLayerException {
DatabaseQuery query = new DatabaseQuery(StatsRecord.class);
query.addQueryParam("eventType", eventType, QueryOperand.EQUALS);
query.addQueryParam("date", limit, QueryOperand.LESSTHAN);
query.setMaxResults(1000);
query.addOrder("date", true);
// TODO Need to make StatsDao to keep track to determine next 1000
// results.
StatsRecord[] records = null;
try {
List<?> objects = queryByCriteria(query);
records = new StatsRecord[objects.size()];
for (int i = 0; i < records.length; i++) {
records[i] = (StatsRecord) objects.get(i);
}
} catch (DataAccessLayerException e) {
records = new StatsRecord[0];
statusHandler.error("Error querying the stats table", e);
if (maxResults > 0) {
query.setMaxResults(maxResults);
}
return records;
return (List<StatsRecord>) queryByCriteria(query);
}
}

View file

@ -43,17 +43,17 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
/**
* Graph Data Request Handler.
*
*
* <pre>
*
*
* SOFTWARE HISTORY
*
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 11, 2012 728 mpduff Initial creation
*
*
* </pre>
*
*
* @author mpduff
* @version 1.0
*/
@ -98,13 +98,13 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
/**
* Get the statistical configuration objects and add them to the response.
*
*
* @return GraphDataResponse
*
*
* @throws Exception
*/
private GraphDataResponse getMetaData() throws Exception {
ConfigLoader loader = new ConfigLoader();
ConfigLoader loader = ConfigLoader.getInstance();
loader.load();
List<StatisticsConfig> configList = loader.getConfigurations();
GraphDataResponse response = new GraphDataResponse();
@ -115,7 +115,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
/**
* Get the Graph Data object and add it to the response.
*
*
* @param request
* The request object
* @return GraphDataResponse
@ -136,8 +136,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
}
if (request.getField() != null) {
query.addQueryParam(FIELD, request.getField(),
QueryOperand.EQUALS);
query.addQueryParam(FIELD, request.getField(), QueryOperand.EQUALS);
}
List<?> results = dao.queryByCriteria(query);
@ -171,7 +170,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
/**
* Convert a Date object to Calendar object.
*
*
* @param date
* @return Calendar object
*/
@ -184,7 +183,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
/**
* Get the display unit.
*
*
* @param dataType
* @param type
* @param category
@ -192,7 +191,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
*/
private String getDisplayUnit(String category, String type, String dataType)
throws Exception {
ConfigLoader loader = new ConfigLoader();
ConfigLoader loader = ConfigLoader.getInstance();
loader.load();
List<StatisticsConfig> configList = loader.getConfigurations();

View file

@ -97,7 +97,7 @@ public class StatsHandler {
* @throws Exception
*/
protected void loadEventValidTypes() throws Exception {
ConfigLoader configLoader = new ConfigLoader();
ConfigLoader configLoader = ConfigLoader.getInstance();
configLoader.load();
HashSet<String> myValidEventTypes = new HashSet<String>();

View file

@ -20,11 +20,17 @@
package com.raytheon.uf.edex.stats.util;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.bind.JAXBException;
import com.raytheon.uf.common.event.Event;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
@ -34,113 +40,246 @@ import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
import com.raytheon.uf.common.stats.xml.StatisticsGroup;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.ReflectionException;
import com.raytheon.uf.common.util.ReflectionUtil;
/**
* Loads StatisticsConfig files from localization.
*
*
* <pre>
*
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Updated error handling and validated config files.
* Nov 07, 2012 1317 mpduff Update config files.
*
* Nov 29, 2012 1350 rjpeter Updated to static, fixed localization, increased validation.
* </pre>
*
*
* @author jsanchez
*
*
*/
public class ConfigLoader {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(ConfigLoader.class);
private static final JAXBManager jaxbManager;
static {
private final JAXBManager jaxbManager;
private static final ConfigLoader instance = new ConfigLoader();
private final IPathManager pm = PathManagerFactory.getPathManager();
private List<StatisticsConfig> configurations = Collections.emptyList();
private Map<String, StatisticsEvent> classToEventMap = Collections
.emptyMap();
private static final String STATS_DIR = "stats";
public static ConfigLoader getInstance() {
return instance;
}
private ConfigLoader() {
try {
jaxbManager = new JAXBManager(StatisticsConfig.class);
} catch (JAXBException e) {
load();
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
private final IPathManager pm = PathManagerFactory.getPathManager();
private final List<StatisticsConfig> configurations = new ArrayList<StatisticsConfig>();
private final String STATS_DIR = "stats";
/**
* Returns a list of all StatisticsConfig files.
*
*
* @return
*/
public List<StatisticsConfig> getConfigurations() {
return configurations;
}
/**
* Returns a map of event type to statistics event.
*
* @return
*/
public Map<String, StatisticsEvent> getTypeView() {
return classToEventMap;
}
/**
* Loads the StatisticsConfig files in the STATS_DIR directory.
*/
public void load() throws Exception {
LocalizationContext[] searchContext = pm
.getLocalSearchHierarchy(LocalizationType.EDEX_STATIC);
LocalizationFile[] localizationFiles = null;
Map<String, LocalizationFile> statConfs = new HashMap<String, LocalizationFile>();
// grab all stats from contexts, allowing overwrite by name
for (LocalizationContext ctx : searchContext) {
localizationFiles = pm.listFiles(ctx, STATS_DIR, null, false, true);
if (localizationFiles != null && localizationFiles.length > 0) {
break;
LocalizationFile[] localizationFiles = pm.listFiles(ctx, STATS_DIR,
null, false, true);
for (LocalizationFile lf : localizationFiles) {
String name = lf.getName();
if (!statConfs.containsKey(name)) {
statConfs.put(name, lf);
}
}
}
if (localizationFiles != null && localizationFiles.length > 0) {
configurations.clear();
for (LocalizationFile localizationFile : localizationFiles) {
if (localizationFile.getFile() != null
&& localizationFile.getFile().exists()) {
StatisticsConfig config = (StatisticsConfig) jaxbManager
.jaxbUnmarshalFromXmlFile(localizationFile
.getFile());
config = validateAggregates(config);
configurations.add(config);
if (!statConfs.isEmpty()) {
List<StatisticsConfig> myConfigurations = new ArrayList<StatisticsConfig>(
statConfs.size());
Map<String, StatisticsEvent> myEvents = new HashMap<String, StatisticsEvent>();
for (LocalizationFile lf : statConfs.values()) {
StatisticsConfig config = lf.jaxbUnmarshal(
StatisticsConfig.class, jaxbManager);
if (config != null) {
validate(myEvents, config);
if (!config.getEvents().isEmpty()) {
myConfigurations.add(config);
}
}
}
configurations = myConfigurations;
classToEventMap = myEvents;
}
}
/**
* Removes the aggregate if its not a numerical parameter.
*
* Validates the StatisticsConfig remove invalid
* events/aggregates/duplicates.
*
* @param config
*/
private StatisticsConfig validateAggregates(StatisticsConfig config)
throws ClassNotFoundException {
List<StatisticsAggregate> aggregates = new ArrayList<StatisticsAggregate>();
for (StatisticsEvent event : config.getEvents()) {
Class<?> clazz = Class.forName(event.getType());
aggregates = new ArrayList<StatisticsAggregate>();
for (StatisticsAggregate aggregate : event.getAggregateList()) {
String aggregateField = aggregate.getField();
private void validate(Map<String, StatisticsEvent> eventMap,
StatisticsConfig config) {
for (Iterator<StatisticsEvent> iter = config.getEvents().iterator(); iter
.hasNext();) {
StatisticsEvent event = iter.next();
String eventType = event.getType();
if (!eventMap.containsKey(eventType)) {
try {
Field field = clazz.getDeclaredField(aggregateField);
if (!field.getType().isPrimitive()) {
statusHandler
.info("'"
+ aggregateField
+ "' not a primitive type. Aggregate being removed. ");
}
aggregates.add(aggregate);
} catch (NoSuchFieldException e) {
statusHandler.info("'" + aggregateField
+ "' not a valid field. Aggregate being removed. ");
}
}
event.setAggregateList(aggregates);
}
Class<?> clazz = Class.forName(eventType);
// verify the type is an Event
clazz.asSubclass(Event.class);
return config;
// validate groupBy fields can be found
List<StatisticsGroup> groups = event.getGroupList();
if (groups != null) {
List<Method> groupByMethods = new ArrayList<Method>(
groups.size());
Set<String> currentFields = new HashSet<String>();
for (Iterator<StatisticsGroup> groupIter = groups
.iterator(); groupIter.hasNext();) {
StatisticsGroup group = groupIter.next();
String fieldName = group.getName();
if (!currentFields.contains(fieldName)) {
try {
Method m = ReflectionUtil.getGetterMethod(
clazz, fieldName);
groupByMethods.add(m);
currentFields.add(fieldName);
} catch (ReflectionException e) {
groupIter.remove();
statusHandler
.warn("'"
+ fieldName
+ "' does not have getter method. Group being removed");
}
} else {
statusHandler
.warn("'"
+ fieldName
+ "' already defined. Duplicate group being removed.");
}
}
event.setGroupByMethods(groupByMethods);
}
// validate aggregate methods can be found and are primitive
List<StatisticsAggregate> curAggregates = event
.getAggregateList();
List<Method> aggregateMethods = new ArrayList<Method>(
curAggregates.size());
Set<String> currentFields = new HashSet<String>();
for (Iterator<StatisticsAggregate> aggrIter = curAggregates
.iterator(); aggrIter.hasNext();) {
StatisticsAggregate aggregate = aggrIter.next();
String aggregateField = aggregate.getField();
try {
Field field = clazz
.getDeclaredField(aggregateField);
if (field.getType().isPrimitive()) {
if (!currentFields.contains(aggregateField)) {
try {
Method m = ReflectionUtil
.getGetterMethod(clazz,
aggregateField);
aggregateMethods.add(m);
currentFields.add(aggregateField);
} catch (ReflectionException e) {
aggrIter.remove();
statusHandler
.warn("'"
+ aggregateField
+ "' does not have getter method. Aggregate being removed");
}
} else {
aggrIter.remove();
statusHandler
.warn("'"
+ aggregateField
+ "' already defined. Duplicate aggregate being removed. ");
}
} else {
aggrIter.remove();
statusHandler
.warn("'"
+ aggregateField
+ "' not a primitive type. Aggregate being removed. ");
}
} catch (NoSuchFieldException e) {
aggrIter.remove();
statusHandler
.warn("'"
+ aggregateField
+ "' not a valid field. Aggregate being removed. ");
}
}
if (!curAggregates.isEmpty()) {
event.setAggregateMethods(aggregateMethods);
eventMap.put(eventType, event);
} else {
iter.remove();
}
} catch (ClassNotFoundException e) {
iter.remove();
statusHandler.warn("'" + eventType
+ "' not a valid type. Type being removed. ");
} catch (ClassCastException e) {
iter.remove();
statusHandler.warn("'" + eventType
+ "' not an Event type. Type being removed. ");
}
} else {
iter.remove();
statusHandler
.warn("'"
+ eventType
+ "' is already defined. StatisticsEvent being skipped.");
}
}
}
}