Issue #1350: Fix stats aggregation
Change-Id: I652db6f6f5bc0cab8fc9e85969bf57f1c2382da3 Former-commit-id:71574f210d
[formerly94df67d354
] [formerly2b124db62a
[formerly b06d51266d600e0b04f7fe10bc36e312a2275428]] Former-commit-id:2b124db62a
Former-commit-id:5a45254db0
This commit is contained in:
parent
fe932e395c
commit
99f19186ef
13 changed files with 531 additions and 413 deletions
|
@ -34,6 +34,7 @@ import com.raytheon.uf.common.localization.ILocalizationAdapter.ListResponse;
|
|||
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
|
||||
import com.raytheon.uf.common.localization.exception.LocalizationException;
|
||||
import com.raytheon.uf.common.localization.exception.LocalizationOpFailedException;
|
||||
import com.raytheon.uf.common.serialization.JAXBManager;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
|
@ -165,7 +166,7 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
|
|||
private LocalizationLevel protectedLevel;
|
||||
|
||||
/** File changed observers */
|
||||
private Set<ILocalizationFileObserver> observers = new HashSet<ILocalizationFileObserver>();
|
||||
private final Set<ILocalizationFileObserver> observers = new HashSet<ILocalizationFileObserver>();
|
||||
|
||||
/** Flag to set if file has been requested */
|
||||
protected boolean fileRequested = false;
|
||||
|
@ -187,8 +188,8 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
|
|||
* @return
|
||||
*/
|
||||
boolean isNull() {
|
||||
return adapter == null && path == null && context == null
|
||||
&& file == null;
|
||||
return (adapter == null) && (path == null) && (context == null)
|
||||
&& (file == null);
|
||||
}
|
||||
|
||||
LocalizationFile(ILocalizationAdapter adapter, LocalizationContext context,
|
||||
|
@ -279,7 +280,7 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
|
|||
adapter.retrieve(this);
|
||||
}
|
||||
|
||||
if (isDirectory == false && !file.exists()) {
|
||||
if ((isDirectory == false) && !file.exists()) {
|
||||
try {
|
||||
file.getParentFile().mkdirs();
|
||||
} catch (Throwable t) {
|
||||
|
@ -352,8 +353,8 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
|
|||
// Read in the bytes
|
||||
int offset = 0;
|
||||
int numRead = 0;
|
||||
while (offset < rval.length
|
||||
&& (numRead = is.read(rval, offset, rval.length - offset)) >= 0) {
|
||||
while ((offset < rval.length)
|
||||
&& ((numRead = is.read(rval, offset, rval.length - offset)) >= 0)) {
|
||||
offset += numRead;
|
||||
}
|
||||
|
||||
|
@ -548,7 +549,7 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
|
|||
* @return true if the file exists
|
||||
*/
|
||||
public boolean exists() {
|
||||
return isNull() == false && adapter.exists(this);
|
||||
return (isNull() == false) && adapter.exists(this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -596,6 +597,44 @@ public final class LocalizationFile implements Comparable<LocalizationFile> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the object version of this jaxb serialized file. Returns null if
|
||||
* the file does not exist or is empty.
|
||||
*
|
||||
* @param <T>
|
||||
* @param resultClass
|
||||
* @param manager
|
||||
* @return
|
||||
* @throws LocalizationException
|
||||
*/
|
||||
public <T> T jaxbUnmarshal(Class<T> resultClass, JAXBManager manager)
|
||||
throws LocalizationException {
|
||||
File f = getFile();
|
||||
if (f.exists() && (f.length() > 0)) {
|
||||
InputStream is = null;
|
||||
try {
|
||||
is = openInputStream();
|
||||
T object = resultClass.cast(manager
|
||||
.jaxbUnmarshalFromInputStream(is));
|
||||
return object;
|
||||
} catch (Exception e) {
|
||||
throw new LocalizationException("Could not unmarshal file "
|
||||
+ file.getName(), e);
|
||||
} finally {
|
||||
if (is != null) {
|
||||
try {
|
||||
is.close();
|
||||
} catch (IOException e) {
|
||||
statusHandler.handle(Priority.WARN,
|
||||
"Failed to close input stream for file", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return context + IPathManager.SEPARATOR + path;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
**/
|
||||
package com.raytheon.uf.common.stats.xml;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
|
@ -28,6 +29,7 @@ import javax.xml.bind.annotation.XmlElement;
|
|||
import javax.xml.bind.annotation.XmlElements;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
import com.raytheon.uf.common.event.Event;
|
||||
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
|
||||
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
||||
|
||||
|
@ -72,6 +74,12 @@ public class StatisticsEvent {
|
|||
@DynamicSerializeElement
|
||||
private List<StatisticsAggregate> aggregateList;
|
||||
|
||||
private Class<? extends Event> typeClass = null;
|
||||
|
||||
private List<Method> groupByMethods = null;
|
||||
|
||||
private List<Method> aggregateMethods = null;
|
||||
|
||||
/**
|
||||
* @return the aggregateList
|
||||
*/
|
||||
|
@ -146,4 +154,29 @@ public class StatisticsEvent {
|
|||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public Class<? extends Event> getTypeClass() {
|
||||
return typeClass;
|
||||
}
|
||||
|
||||
public void setTypeClass(Class<? extends Event> typeClass) {
|
||||
this.typeClass = typeClass;
|
||||
}
|
||||
|
||||
public List<Method> getGroupByMethods() {
|
||||
return groupByMethods;
|
||||
}
|
||||
|
||||
public void setGroupByMethods(List<Method> groupByMethods) {
|
||||
this.groupByMethods = groupByMethods;
|
||||
}
|
||||
|
||||
public List<Method> getAggregateMethods() {
|
||||
return aggregateMethods;
|
||||
}
|
||||
|
||||
public void setAggregateMethods(List<Method> aggregateMethods) {
|
||||
this.aggregateMethods = aggregateMethods;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ public final class ReflectionUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static Object getter(Object object, String name)
|
||||
public static Method getGetterMethod(Class<?> clazz, String name)
|
||||
throws ReflectionException {
|
||||
// Assume camel cased names - capitalize first letter...
|
||||
String method = Character.toUpperCase(name.charAt(0))
|
||||
|
@ -92,13 +92,23 @@ public final class ReflectionUtil {
|
|||
Method m;
|
||||
try {
|
||||
// Try common 'get' first...
|
||||
m = object.getClass().getMethod("get" + method);
|
||||
m = clazz.getMethod("get" + method);
|
||||
} catch (NoSuchMethodException e) {
|
||||
// Try 'is' as a prefix
|
||||
m = object.getClass().getMethod("is" + method);
|
||||
m = clazz.getMethod("is" + method);
|
||||
}
|
||||
|
||||
return m.invoke(object, (Object[]) null);
|
||||
return m;
|
||||
} catch (Exception e) {
|
||||
throw new ReflectionException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static Object getter(Object object, String name)
|
||||
throws ReflectionException {
|
||||
try {
|
||||
return getGetterMethod(object.getClass(), name).invoke(object,
|
||||
(Object[]) null);
|
||||
} catch (Exception e) {
|
||||
throw new ReflectionException(e);
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@
|
|||
|
||||
<!-- schedule the timer to purge outgoing directory -->
|
||||
<route id="purgeOutgoingScheduled">
|
||||
<from uri="purgeOutgoingCron" />
|
||||
<from ref="purgeOutgoingCron" />
|
||||
<bean ref="purgeOutgoing" method="purge" />
|
||||
</route>
|
||||
|
||||
|
@ -67,7 +67,7 @@
|
|||
|
||||
<!-- Purge on Scheduled timer -->
|
||||
<route id="purgeScheduled">
|
||||
<from uri="purgeCron" />
|
||||
<from ref="purgeCron" />
|
||||
<doTry>
|
||||
<bean ref="purgeManager" method="executePurge" />
|
||||
<doCatch>
|
||||
|
@ -80,7 +80,7 @@
|
|||
|
||||
<!-- schedule the timer to purge log directories -->
|
||||
<route id="purgeLogScheduled">
|
||||
<from uri="purgeLogsCron" />
|
||||
<from ref="purgeLogsCron" />
|
||||
<bean ref="purgeLogs" method="purge" />
|
||||
</route>
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ Require-Bundle: com.raytheon.uf.common.serialization;bundle-version="1.12.1174",
|
|||
com.raytheon.uf.common.serialization.comm;bundle-version="1.12.1174",
|
||||
com.raytheon.uf.common.stats;bundle-version="1.0.0",
|
||||
com.raytheon.uf.common.dataplugin;bundle-version="1.12.1174",
|
||||
com.raytheon.uf.common.util,
|
||||
org.springframework;bundle-version="2.5.6",
|
||||
com.raytheon.uf.edex.core;bundle-version="1.12.1174",
|
||||
com.raytheon.uf.common.status;bundle-version="1.12.1174",
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
|
||||
<bean id="aggregateManager" class="com.raytheon.uf.edex.stats.AggregateManager">
|
||||
<constructor-arg value="${stats.period}"/>
|
||||
<constructor-arg value="${stats.scanInterval}"/>
|
||||
</bean>
|
||||
|
||||
<bean id="edexStatsRegistered" factory-bean="clusteredCamelContextMgr"
|
||||
|
@ -24,9 +23,10 @@
|
|||
xmlns="http://camel.apache.org/schema/spring"
|
||||
errorHandlerRef="errorHandler"
|
||||
autoStartup="false">
|
||||
<endpoint id="statsScanTimer" uri="timer://scanStats?period=${stats.scanInterval}m"/>
|
||||
|
||||
<route id="statsTableScan">
|
||||
<from uri="timer://scanStats?period=15m" />
|
||||
<from ref="statsScanTimer" />
|
||||
<doTry>
|
||||
<bean ref="statsPurge" method="purgeAggregates"/>
|
||||
<bean ref="aggregateManager" method="scan"/>
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# scan interval of stats table in minutes
|
||||
stats.scanInterval=15
|
||||
stats.scanInterval=2
|
||||
# bucket interval or period of when to aggregate in minutes
|
||||
stats.period=2
|
||||
stats.period=5
|
|
@ -20,31 +20,30 @@
|
|||
package com.raytheon.uf.edex.stats;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.raytheon.uf.common.event.Event;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.stats.AggregateRecord;
|
||||
import com.raytheon.uf.common.stats.StatsRecord;
|
||||
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
|
||||
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
|
||||
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
|
||||
import com.raytheon.uf.common.stats.xml.StatisticsGroup;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.time.TimeRange;
|
||||
import com.raytheon.uf.edex.database.dao.CoreDao;
|
||||
import com.raytheon.uf.edex.database.dao.DaoConfig;
|
||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
import com.raytheon.uf.common.util.CollectionUtil;
|
||||
import com.raytheon.uf.edex.stats.dao.AggregateRecordDao;
|
||||
import com.raytheon.uf.edex.stats.dao.StatsDao;
|
||||
import com.raytheon.uf.edex.stats.handler.StatsHandler;
|
||||
import com.raytheon.uf.edex.stats.util.ConfigLoader;
|
||||
|
||||
/**
|
||||
|
@ -60,119 +59,67 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* Aug 21, 2012 jsanchez Stored the aggregate buckets in the db.
|
||||
* Nov 07, 2012 1317 mpduff Updated Configuration Files.
|
||||
*
|
||||
* Nov 28, 2012 1350 rjpeter Simplied aggregation and added aggregation with current db aggregate records.
|
||||
* </pre>
|
||||
*
|
||||
* @author jsanchez
|
||||
*
|
||||
*/
|
||||
public class AggregateManager {
|
||||
private class TimeRangeKey extends TimeRange {
|
||||
private static final long serialVersionUID = 4603487307433273159L;
|
||||
|
||||
public TimeRangeKey(Calendar cal1, Calendar cal2) {
|
||||
super(cal1, cal2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o != null && o instanceof TimeRange) {
|
||||
TimeRange other = (TimeRange) o;
|
||||
|
||||
return getStart().equals(other.getStart())
|
||||
&& getEnd().equals(other.getEnd());
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static final IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(AggregateManager.class);
|
||||
|
||||
/** In minutes */
|
||||
private int bucketInterval;
|
||||
|
||||
/** In minutes */
|
||||
private int scanInterval;
|
||||
|
||||
/** default value */
|
||||
private static final int defaultBucketInterval = 5;
|
||||
|
||||
/** default value */
|
||||
private static final int defaultScanInterval = 15;
|
||||
|
||||
/** loads localized copies of the statsConfig */
|
||||
private final ConfigLoader configLoader;
|
||||
|
||||
private final CoreDao aggregateRecordDao = new CoreDao(DaoConfig.forClass(
|
||||
"metadata", AggregateRecord.class));
|
||||
|
||||
public AggregateManager(String bucketInterval, String scanInterval)
|
||||
throws Exception {
|
||||
configLoader = new ConfigLoader();
|
||||
validateIntervals(bucketInterval, scanInterval);
|
||||
configLoader.load();
|
||||
StatsHandler.setValidEventTypes(configLoader.getConfigurations());
|
||||
public AggregateManager(String bucketInterval) {
|
||||
validateIntervals(bucketInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the aggregation based on the statsConfig file.
|
||||
* Aggregates the grouped events and then aggregates them with any matching
|
||||
* range in DB and stores the updated aggregate.
|
||||
*
|
||||
* @param key
|
||||
* @param data
|
||||
* @param dao
|
||||
* @param statsEvent
|
||||
* @param timeRange
|
||||
* @param groupedEvents
|
||||
*/
|
||||
private void aggregate(StatisticsEvent statsEvent, TimeRange timeRange,
|
||||
List<Event> data) {
|
||||
Calendar start = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
|
||||
private void aggregate(AggregateRecordDao dao, StatisticsEvent statsEvent,
|
||||
TimeRange timeRange, Multimap<String, Event> groupedEvents) {
|
||||
Calendar start = TimeUtil.newCalendar(TimeZone.getTimeZone("GMT"));
|
||||
start.setTime(timeRange.getStart());
|
||||
|
||||
Calendar end = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
|
||||
Calendar end = TimeUtil.newCalendar(TimeZone.getTimeZone("GMT"));
|
||||
end.setTime(timeRange.getEnd());
|
||||
|
||||
// collect grouping names from stats config
|
||||
List<String> groupByColumns = new ArrayList<String>();
|
||||
for (StatisticsGroup groupBy : statsEvent.getGroupList()) {
|
||||
String column = groupBy.getName();
|
||||
groupByColumns.add(column);
|
||||
}
|
||||
|
||||
// breaks data into groups
|
||||
Map<String, List<Event>> map = divideIntoGroups(data, groupByColumns);
|
||||
|
||||
// perform aggregate functions on the grouped data
|
||||
for (String groupKey : map.keySet()) {
|
||||
for (String groupKey : groupedEvents.keySet()) {
|
||||
Collection<Event> groupData = groupedEvents.get(groupKey);
|
||||
Iterator<Method> aggrMethodIter = statsEvent.getAggregateMethods()
|
||||
.iterator();
|
||||
Iterator<StatisticsAggregate> statAggrIter = statsEvent
|
||||
.getAggregateList().iterator();
|
||||
int count = groupData.size();
|
||||
|
||||
while (aggrMethodIter.hasNext() && statAggrIter.hasNext()) {
|
||||
String field = statAggrIter.next().getField();
|
||||
Method m = aggrMethodIter.next();
|
||||
|
||||
List<Event> groupData = map.get(groupKey);
|
||||
for (StatisticsAggregate aggregate : statsEvent.getAggregateList()) {
|
||||
String field = aggregate.getField();
|
||||
try {
|
||||
double[] values = new double[groupData.size()];
|
||||
String methodName = getterMethodName(field);
|
||||
for (int i = 0; i < groupData.size(); i++) {
|
||||
Object obj = groupData.get(i);
|
||||
Class<?> clazz = obj.getClass();
|
||||
Method m = clazz.getMethod(methodName, new Class<?>[0]);
|
||||
Number number = (Number) m.invoke(obj, new Object[0]);
|
||||
values[i] = number.doubleValue();
|
||||
}
|
||||
|
||||
double count = values.length;
|
||||
double max = 0;
|
||||
double max = -Double.MAX_VALUE;
|
||||
double min = Double.MAX_VALUE;
|
||||
double sum = 0;
|
||||
|
||||
for (double value : values) {
|
||||
for (Event event : groupData) {
|
||||
Number number = (Number) m.invoke(event, new Object[0]);
|
||||
double value = number.doubleValue();
|
||||
sum += value;
|
||||
if (value > max) {
|
||||
max = value;
|
||||
|
@ -189,7 +136,7 @@ public class AggregateManager {
|
|||
record.setMin(min);
|
||||
record.setMax(max);
|
||||
record.setCount(count);
|
||||
aggregateRecordDao.persist(record);
|
||||
dao.mergeRecord(record);
|
||||
} catch (Exception e) {
|
||||
statusHandler.error("Unable to aggregate '" + field + "'",
|
||||
e);
|
||||
|
@ -206,73 +153,13 @@ public class AggregateManager {
|
|||
* @param date
|
||||
* @return
|
||||
*/
|
||||
private TimeRangeKey createTimeRangeKey(Calendar date) {
|
||||
private TimeRange createTimeRange(Calendar date) {
|
||||
Calendar start = getBucketStartTime(date);
|
||||
Calendar end = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
|
||||
end.setTimeInMillis(start.getTimeInMillis());
|
||||
end.add(Calendar.MINUTE, bucketInterval);
|
||||
|
||||
TimeRangeKey timeRangeKey = new TimeRangeKey(start, end);
|
||||
|
||||
return timeRangeKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Breaks the list of data into groups based on groupByColumns. The key is a
|
||||
* concatenation of the column values (i.e. datatype.username). This method
|
||||
* can group data to n-number of levels.
|
||||
*
|
||||
* @param data
|
||||
* @param groupByColumns
|
||||
* @return
|
||||
*/
|
||||
private Map<String, List<Event>> divideIntoGroups(List<Event> data,
|
||||
List<String> groupByColumns) {
|
||||
Map<String, List<Event>> map = new HashMap<String, List<Event>>();
|
||||
map.put("", data);
|
||||
for (String column : groupByColumns) {
|
||||
|
||||
List<Map<String, List<Event>>> listOfMaps = new ArrayList<Map<String, List<Event>>>();
|
||||
for (String parent : map.keySet()) {
|
||||
List<Event> list = map.get(parent);
|
||||
listOfMaps.add(group(list, column, parent));
|
||||
}
|
||||
|
||||
map.clear();
|
||||
|
||||
// replace map with grouped data
|
||||
for (Map<String, List<Event>> m : listOfMaps) {
|
||||
for (String k : m.keySet()) {
|
||||
map.put(k, m.get(k));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the events from the stats records.
|
||||
*
|
||||
* @param records
|
||||
* @return
|
||||
*/
|
||||
private List<Event> extractEvents(List<StatsRecord> records) {
|
||||
List<Event> eventsList = new ArrayList<Event>(records.size());
|
||||
|
||||
for (StatsRecord record : records) {
|
||||
try {
|
||||
Event event = SerializationUtil.transformFromThrift(
|
||||
Event.class, record.getEvent());
|
||||
eventsList.add(event);
|
||||
} catch (SerializationException e) {
|
||||
statusHandler
|
||||
.error("Error trying to transform event. Aggregation may be inaccurate. ",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
return eventsList;
|
||||
return new TimeRange(start, end);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -302,155 +189,133 @@ public class AggregateManager {
|
|||
return start;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the getter method for the parameter
|
||||
*
|
||||
* @param parameter
|
||||
* @return
|
||||
*/
|
||||
private String getterMethodName(String parameter) {
|
||||
return "get" + parameter.substring(0, 1).toUpperCase()
|
||||
+ parameter.substring(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to group data to one level.
|
||||
*
|
||||
* @param data
|
||||
* @param column
|
||||
* @param parent
|
||||
* @return
|
||||
*/
|
||||
private Map<String, List<Event>> group(List<Event> data, String column,
|
||||
String parent) {
|
||||
Map<String, List<Event>> map = new HashMap<String, List<Event>>();
|
||||
String methodName = getterMethodName(column);
|
||||
for (Event rec : data) {
|
||||
try {
|
||||
Class<?> clazz = rec.getClass();
|
||||
Method m = clazz.getMethod(methodName, new Class<?>[0]);
|
||||
String value = column + ":"
|
||||
+ String.valueOf(m.invoke(rec, new Object[0]));
|
||||
if (parent.length() > 0) {
|
||||
value = parent + "-" + value;
|
||||
}
|
||||
List<Event> list = map.get(value);
|
||||
if (list == null) {
|
||||
list = new ArrayList<Event>();
|
||||
}
|
||||
list.add(rec);
|
||||
map.put(value, list);
|
||||
} catch (Exception e) {
|
||||
statusHandler.error("Error creating groups", e);
|
||||
}
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve StatRecords from the metadata.event.stats table. This method
|
||||
* does not retrieve records of the current bucket.
|
||||
*/
|
||||
private void retrieveStatRecords(StatsDao statsRecordDao,
|
||||
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets)
|
||||
throws Exception {
|
||||
Calendar current = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
|
||||
for (StatisticsConfig statsConfig : configLoader.getConfigurations()) {
|
||||
for (StatisticsEvent event : statsConfig.getEvents()) {
|
||||
String eventType = event.getType();
|
||||
// Does not retrieve stat records of current bucket.
|
||||
// this method should always return a valid array.
|
||||
StatsRecord[] records = statsRecordDao.retrieveRecords(
|
||||
getBucketStartTime(current), eventType);
|
||||
sort(eventType, records, aggregateBuckets);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Scans the stats table to be stored in buckets and aggregate if necessary.
|
||||
*/
|
||||
public void scan() throws Exception {
|
||||
long t0 = System.currentTimeMillis();
|
||||
ConfigLoader configLoader = ConfigLoader.getInstance();
|
||||
StatsDao statsRecordDao = new StatsDao();
|
||||
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets = new HashMap<String, Map<TimeRangeKey, List<StatsRecord>>>();
|
||||
AggregateRecordDao aggrRecordDao = new AggregateRecordDao();
|
||||
|
||||
// retrieves records and sorts in buckets
|
||||
retrieveStatRecords(statsRecordDao, aggregateBuckets);
|
||||
Map<String, StatisticsEvent> statsMap = configLoader.getTypeView();
|
||||
|
||||
// loops through map to aggregate buckets
|
||||
for (StatisticsConfig statsConfig : configLoader.getConfigurations()) {
|
||||
for (StatisticsEvent event : statsConfig.getEvents()) {
|
||||
String eventType = event.getType();
|
||||
// latest time to pull
|
||||
Calendar timeToProcess = Calendar.getInstance(TimeZone
|
||||
.getTimeZone("GMT"));
|
||||
int count = 0;
|
||||
|
||||
Map<TimeRangeKey, List<StatsRecord>> map = aggregateBuckets
|
||||
.get(eventType);
|
||||
// map should never be null, since it will be set in the 'sort'
|
||||
// method.
|
||||
for (Iterator<Map.Entry<TimeRangeKey, List<StatsRecord>>> iter = map
|
||||
.entrySet().iterator(); iter.hasNext();) {
|
||||
Entry<TimeRangeKey, List<StatsRecord>> element = iter
|
||||
.next();
|
||||
TimeRangeKey tr = element.getKey();
|
||||
List<StatsRecord> records = element.getValue();
|
||||
if (!records.isEmpty()) {
|
||||
List<Event> data = extractEvents(records);
|
||||
aggregate(event, tr, data);
|
||||
try {
|
||||
statsRecordDao.deleteAll(records);
|
||||
} catch (Exception e) {
|
||||
statusHandler.error("Error deleting stat records",
|
||||
e);
|
||||
}
|
||||
// process the events by type
|
||||
for (Map.Entry<String, StatisticsEvent> entry : statsMap.entrySet()) {
|
||||
String type = entry.getKey();
|
||||
StatisticsEvent event = entry.getValue();
|
||||
List<StatsRecord> records = null;
|
||||
|
||||
do {
|
||||
// retrieve stats in blocks of 1000
|
||||
records = statsRecordDao.retrieveRecords(timeToProcess, type,
|
||||
2000);
|
||||
|
||||
if (!CollectionUtil.isNullOrEmpty(records)) {
|
||||
// sort events into time buckets
|
||||
Map<TimeRange, Multimap<String, Event>> timeMap = sort(
|
||||
event, records);
|
||||
|
||||
for (Map.Entry<TimeRange, Multimap<String, Event>> timeMapEntry : timeMap
|
||||
.entrySet()) {
|
||||
aggregate(aggrRecordDao, event, timeMapEntry.getKey(),
|
||||
timeMapEntry.getValue());
|
||||
}
|
||||
iter.remove();
|
||||
|
||||
try {
|
||||
statsRecordDao.deleteAll(records);
|
||||
} catch (Exception e) {
|
||||
statusHandler.error("Error deleting stat records", e);
|
||||
}
|
||||
|
||||
count += records.size();
|
||||
}
|
||||
}
|
||||
} while (!CollectionUtil.isNullOrEmpty(records));
|
||||
}
|
||||
|
||||
long t1 = System.currentTimeMillis();
|
||||
statusHandler.info("Aggregated " + count + " stat events in "
|
||||
+ (t1 - t0) + " ms");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the results into proper aggregate buckets. This method assumes
|
||||
* that the records are in date order.
|
||||
* Sorts the records into time buckets and groups by the underlying Event.
|
||||
*
|
||||
* @param events
|
||||
*/
|
||||
private void sort(String eventType, StatsRecord[] records,
|
||||
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets)
|
||||
throws Exception {
|
||||
Map<TimeRangeKey, List<StatsRecord>> map = aggregateBuckets
|
||||
.get(eventType);
|
||||
if (map == null) {
|
||||
map = new HashMap<TimeRangeKey, List<StatsRecord>>();
|
||||
aggregateBuckets.put(eventType, map);
|
||||
}
|
||||
|
||||
TimeRangeKey timeRange = null;
|
||||
for (StatsRecord record : records) {
|
||||
if (timeRange == null
|
||||
|| !timeRange.contains(record.getDate().getTime())) {
|
||||
// Create bucket based on stats record date
|
||||
timeRange = createTimeRangeKey(record.getDate());
|
||||
}
|
||||
|
||||
List<StatsRecord> bucketList = map.get(timeRange);
|
||||
if (bucketList == null) {
|
||||
bucketList = new ArrayList<StatsRecord>();
|
||||
map.put(timeRange, bucketList);
|
||||
}
|
||||
bucketList.add(record);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if the bucket interval and the scan interval are valid values. If
|
||||
* values are invalid then values will be set to default values.
|
||||
*
|
||||
* @param bucketInt
|
||||
* @param scanInt
|
||||
* @param records
|
||||
* @return
|
||||
*/
|
||||
private void validateIntervals(String bucketInt, String scanInt) {
|
||||
private Map<TimeRange, Multimap<String, Event>> sort(
|
||||
StatisticsEvent statEvent, List<StatsRecord> records) {
|
||||
Map<TimeRange, Multimap<String, Event>> rval = new HashMap<TimeRange, Multimap<String, Event>>();
|
||||
TimeRange timeRange = null;
|
||||
Multimap<String, Event> eventsByGroup = null;
|
||||
final Object[] EMPTY_OBJ_ARR = new Object[0];
|
||||
StringBuilder group = new StringBuilder();
|
||||
|
||||
for (StatsRecord record : records) {
|
||||
if ((timeRange == null)
|
||||
|| !timeRange.contains(record.getDate().getTime())) {
|
||||
// Create bucket based on stats record date
|
||||
timeRange = createTimeRange(record.getDate());
|
||||
eventsByGroup = rval.get(timeRange);
|
||||
if (eventsByGroup == null) {
|
||||
eventsByGroup = ArrayListMultimap.create();
|
||||
rval.put(timeRange, eventsByGroup);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// get underlying event
|
||||
Event event = SerializationUtil.transformFromThrift(
|
||||
Event.class, record.getEvent());
|
||||
|
||||
// determine group
|
||||
boolean addDelim = false;
|
||||
Iterator<Method> gMethodIter = statEvent.getGroupByMethods()
|
||||
.iterator();
|
||||
Iterator<StatisticsGroup> gFieldNameIter = statEvent
|
||||
.getGroupList().iterator();
|
||||
group.setLength(0);
|
||||
|
||||
while (gMethodIter.hasNext() && gFieldNameIter.hasNext()) {
|
||||
Method m = gMethodIter.next();
|
||||
String field = gFieldNameIter.next().getName();
|
||||
String gVal = String
|
||||
.valueOf(m.invoke(event, EMPTY_OBJ_ARR));
|
||||
|
||||
if (addDelim) {
|
||||
group.append('-');
|
||||
} else {
|
||||
addDelim = true;
|
||||
}
|
||||
|
||||
group.append(field).append(':').append(gVal);
|
||||
}
|
||||
|
||||
eventsByGroup.put(group.toString(), event);
|
||||
} catch (Exception e) {
|
||||
statusHandler
|
||||
.error("Error processing event. Aggregation may be inaccurate. ",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if the bucket interval is a valid value. If value is invalid then
|
||||
* value will be set to default value.
|
||||
*
|
||||
* @param bucketInt
|
||||
* @return
|
||||
*/
|
||||
private void validateIntervals(String bucketInt) {
|
||||
try {
|
||||
bucketInterval = Integer.parseInt(bucketInt);
|
||||
} catch (NumberFormatException e) {
|
||||
|
@ -460,26 +325,6 @@ public class AggregateManager {
|
|||
+ defaultBucketInterval + "'");
|
||||
}
|
||||
|
||||
try {
|
||||
scanInterval = Integer.parseInt(scanInt);
|
||||
} catch (NumberFormatException e) {
|
||||
scanInterval = defaultScanInterval;
|
||||
statusHandler.info("'" + scanInt
|
||||
+ "' is not a valid scan interval value. Setting to '"
|
||||
+ defaultScanInterval + "'");
|
||||
}
|
||||
|
||||
if (scanInterval < bucketInterval) {
|
||||
scanInterval = defaultBucketInterval;
|
||||
bucketInterval = defaultBucketInterval;
|
||||
statusHandler
|
||||
.info("The bucket interval can not be greater than the scan interval. Setting scan interval to '"
|
||||
+ defaultBucketInterval
|
||||
+ "' and bucket interval to '"
|
||||
+ bucketInterval
|
||||
+ "'");
|
||||
}
|
||||
|
||||
int incrementsWithinHour = bucketInterval;
|
||||
// checks if period is larger than 60 minutes
|
||||
if (bucketInterval > 60) {
|
||||
|
@ -492,5 +337,4 @@ public class AggregateManager {
|
|||
+ bucketInterval + "'");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
package com.raytheon.uf.edex.stats.dao;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.raytheon.uf.common.dataquery.db.QueryParam.QueryOperand;
|
||||
import com.raytheon.uf.common.stats.AggregateRecord;
|
||||
import com.raytheon.uf.common.util.CollectionUtil;
|
||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||
import com.raytheon.uf.edex.database.dao.CoreDao;
|
||||
import com.raytheon.uf.edex.database.dao.DaoConfig;
|
||||
import com.raytheon.uf.edex.database.query.DatabaseQuery;
|
||||
|
||||
public class AggregateRecordDao extends CoreDao {
|
||||
/**
|
||||
* Creates a new data access object
|
||||
*/
|
||||
public AggregateRecordDao() {
|
||||
super(DaoConfig.forClass("metadata", AggregateRecord.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves stat records that has a date before the limit.
|
||||
*
|
||||
* @param limit
|
||||
* @param eventType
|
||||
* @param maxResults
|
||||
* if greater than 0 will limit database results to maxResults
|
||||
* @return an array of stat records. If an error occurs, then an array of
|
||||
* size 0 will be returned.
|
||||
* @throws DataAccessLayerException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void mergeRecord(AggregateRecord newRecord)
|
||||
throws DataAccessLayerException {
|
||||
DatabaseQuery query = new DatabaseQuery(AggregateRecord.class);
|
||||
query.addQueryParam("eventType", newRecord.getEventType(),
|
||||
QueryOperand.EQUALS);
|
||||
query.addQueryParam("field", newRecord.getField(), QueryOperand.EQUALS);
|
||||
query.addQueryParam("grouping", newRecord.getGrouping(),
|
||||
QueryOperand.EQUALS);
|
||||
query.addQueryParam("startDate", newRecord.getStartDate(),
|
||||
QueryOperand.EQUALS);
|
||||
query.addQueryParam("endDate", newRecord.getEndDate(),
|
||||
QueryOperand.EQUALS);
|
||||
|
||||
List<AggregateRecord> results = (List<AggregateRecord>) queryByCriteria(query);
|
||||
if (!CollectionUtil.isNullOrEmpty(results)) {
|
||||
// shouldn't be able to get multiple results, just merge with first
|
||||
// and update
|
||||
AggregateRecord prevRecord = results.get(0);
|
||||
prevRecord.setCount(prevRecord.getCount() + newRecord.getCount());
|
||||
prevRecord.setSum(prevRecord.getSum() + newRecord.getSum());
|
||||
if (newRecord.getMin() < prevRecord.getMin()) {
|
||||
prevRecord.setMin(newRecord.getMin());
|
||||
}
|
||||
if (newRecord.getMax() > prevRecord.getMax()) {
|
||||
prevRecord.setMax(newRecord.getMax());
|
||||
}
|
||||
update(prevRecord);
|
||||
} else {
|
||||
persist(newRecord);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,17 +5,12 @@ import java.util.List;
|
|||
|
||||
import com.raytheon.uf.common.dataquery.db.QueryParam.QueryOperand;
|
||||
import com.raytheon.uf.common.stats.StatsRecord;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||
import com.raytheon.uf.edex.database.dao.CoreDao;
|
||||
import com.raytheon.uf.edex.database.dao.DaoConfig;
|
||||
import com.raytheon.uf.edex.database.query.DatabaseQuery;
|
||||
|
||||
public class StatsDao extends CoreDao {
|
||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(StatsDao.class);
|
||||
|
||||
/**
|
||||
* Creates a new data access object
|
||||
*/
|
||||
|
@ -28,30 +23,24 @@ public class StatsDao extends CoreDao {
|
|||
*
|
||||
* @param limit
|
||||
* @param eventType
|
||||
* @param maxResults
|
||||
* if greater than 0 will limit database results to maxResults
|
||||
* @return an array of stat records. If an error occurs, then an array of
|
||||
* size 0 will be returned.
|
||||
* @throws DataAccessLayerException
|
||||
*/
|
||||
public StatsRecord[] retrieveRecords(Calendar limit, String eventType) {
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<StatsRecord> retrieveRecords(Calendar limit, String eventType,
|
||||
int maxResults) throws DataAccessLayerException {
|
||||
DatabaseQuery query = new DatabaseQuery(StatsRecord.class);
|
||||
query.addQueryParam("eventType", eventType, QueryOperand.EQUALS);
|
||||
query.addQueryParam("date", limit, QueryOperand.LESSTHAN);
|
||||
query.setMaxResults(1000);
|
||||
query.addOrder("date", true);
|
||||
|
||||
// TODO Need to make StatsDao to keep track to determine next 1000
|
||||
// results.
|
||||
StatsRecord[] records = null;
|
||||
try {
|
||||
List<?> objects = queryByCriteria(query);
|
||||
records = new StatsRecord[objects.size()];
|
||||
for (int i = 0; i < records.length; i++) {
|
||||
records[i] = (StatsRecord) objects.get(i);
|
||||
}
|
||||
} catch (DataAccessLayerException e) {
|
||||
records = new StatsRecord[0];
|
||||
statusHandler.error("Error querying the stats table", e);
|
||||
if (maxResults > 0) {
|
||||
query.setMaxResults(maxResults);
|
||||
}
|
||||
|
||||
return records;
|
||||
return (List<StatsRecord>) queryByCriteria(query);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,17 +43,17 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
|
|||
|
||||
/**
|
||||
* Graph Data Request Handler.
|
||||
*
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Sep 11, 2012 728 mpduff Initial creation
|
||||
*
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
*
|
||||
* @author mpduff
|
||||
* @version 1.0
|
||||
*/
|
||||
|
@ -98,13 +98,13 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
|
|||
|
||||
/**
|
||||
* Get the statistical configuration objects and add them to the response.
|
||||
*
|
||||
*
|
||||
* @return GraphDataResponse
|
||||
*
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
private GraphDataResponse getMetaData() throws Exception {
|
||||
ConfigLoader loader = new ConfigLoader();
|
||||
ConfigLoader loader = ConfigLoader.getInstance();
|
||||
loader.load();
|
||||
List<StatisticsConfig> configList = loader.getConfigurations();
|
||||
GraphDataResponse response = new GraphDataResponse();
|
||||
|
@ -115,7 +115,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
|
|||
|
||||
/**
|
||||
* Get the Graph Data object and add it to the response.
|
||||
*
|
||||
*
|
||||
* @param request
|
||||
* The request object
|
||||
* @return GraphDataResponse
|
||||
|
@ -136,8 +136,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
|
|||
}
|
||||
|
||||
if (request.getField() != null) {
|
||||
query.addQueryParam(FIELD, request.getField(),
|
||||
QueryOperand.EQUALS);
|
||||
query.addQueryParam(FIELD, request.getField(), QueryOperand.EQUALS);
|
||||
}
|
||||
|
||||
List<?> results = dao.queryByCriteria(query);
|
||||
|
@ -171,7 +170,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
|
|||
|
||||
/**
|
||||
* Convert a Date object to Calendar object.
|
||||
*
|
||||
*
|
||||
* @param date
|
||||
* @return Calendar object
|
||||
*/
|
||||
|
@ -184,7 +183,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
|
|||
|
||||
/**
|
||||
* Get the display unit.
|
||||
*
|
||||
*
|
||||
* @param dataType
|
||||
* @param type
|
||||
* @param category
|
||||
|
@ -192,7 +191,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
|
|||
*/
|
||||
private String getDisplayUnit(String category, String type, String dataType)
|
||||
throws Exception {
|
||||
ConfigLoader loader = new ConfigLoader();
|
||||
ConfigLoader loader = ConfigLoader.getInstance();
|
||||
loader.load();
|
||||
List<StatisticsConfig> configList = loader.getConfigurations();
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ public class StatsHandler {
|
|||
* @throws Exception
|
||||
*/
|
||||
protected void loadEventValidTypes() throws Exception {
|
||||
ConfigLoader configLoader = new ConfigLoader();
|
||||
ConfigLoader configLoader = ConfigLoader.getInstance();
|
||||
configLoader.load();
|
||||
HashSet<String> myValidEventTypes = new HashSet<String>();
|
||||
|
||||
|
|
|
@ -20,11 +20,17 @@
|
|||
package com.raytheon.uf.edex.stats.util;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.xml.bind.JAXBException;
|
||||
|
||||
import com.raytheon.uf.common.event.Event;
|
||||
import com.raytheon.uf.common.localization.IPathManager;
|
||||
import com.raytheon.uf.common.localization.LocalizationContext;
|
||||
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
|
||||
|
@ -34,113 +40,246 @@ import com.raytheon.uf.common.serialization.JAXBManager;
|
|||
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
|
||||
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
|
||||
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
|
||||
import com.raytheon.uf.common.stats.xml.StatisticsGroup;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.util.ReflectionException;
|
||||
import com.raytheon.uf.common.util.ReflectionUtil;
|
||||
|
||||
/**
|
||||
* Loads StatisticsConfig files from localization.
|
||||
*
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Aug 21, 2012 jsanchez Updated error handling and validated config files.
|
||||
* Nov 07, 2012 1317 mpduff Update config files.
|
||||
*
|
||||
* Nov 29, 2012 1350 rjpeter Updated to static, fixed localization, increased validation.
|
||||
* </pre>
|
||||
*
|
||||
*
|
||||
* @author jsanchez
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class ConfigLoader {
|
||||
|
||||
private static final IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(ConfigLoader.class);
|
||||
|
||||
private static final JAXBManager jaxbManager;
|
||||
static {
|
||||
private final JAXBManager jaxbManager;
|
||||
|
||||
private static final ConfigLoader instance = new ConfigLoader();
|
||||
|
||||
private final IPathManager pm = PathManagerFactory.getPathManager();
|
||||
|
||||
private List<StatisticsConfig> configurations = Collections.emptyList();
|
||||
|
||||
private Map<String, StatisticsEvent> classToEventMap = Collections
|
||||
.emptyMap();
|
||||
|
||||
private static final String STATS_DIR = "stats";
|
||||
|
||||
public static ConfigLoader getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
private ConfigLoader() {
|
||||
try {
|
||||
jaxbManager = new JAXBManager(StatisticsConfig.class);
|
||||
} catch (JAXBException e) {
|
||||
load();
|
||||
} catch (Exception e) {
|
||||
throw new ExceptionInInitializerError(e);
|
||||
}
|
||||
}
|
||||
|
||||
private final IPathManager pm = PathManagerFactory.getPathManager();
|
||||
|
||||
private final List<StatisticsConfig> configurations = new ArrayList<StatisticsConfig>();
|
||||
|
||||
private final String STATS_DIR = "stats";
|
||||
|
||||
/**
|
||||
* Returns a list of all StatisticsConfig files.
|
||||
*
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public List<StatisticsConfig> getConfigurations() {
|
||||
return configurations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of event type to statistics event.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public Map<String, StatisticsEvent> getTypeView() {
|
||||
return classToEventMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the StatisticsConfig files in the STATS_DIR directory.
|
||||
*/
|
||||
public void load() throws Exception {
|
||||
LocalizationContext[] searchContext = pm
|
||||
.getLocalSearchHierarchy(LocalizationType.EDEX_STATIC);
|
||||
LocalizationFile[] localizationFiles = null;
|
||||
Map<String, LocalizationFile> statConfs = new HashMap<String, LocalizationFile>();
|
||||
|
||||
// grab all stats from contexts, allowing overwrite by name
|
||||
for (LocalizationContext ctx : searchContext) {
|
||||
localizationFiles = pm.listFiles(ctx, STATS_DIR, null, false, true);
|
||||
if (localizationFiles != null && localizationFiles.length > 0) {
|
||||
break;
|
||||
LocalizationFile[] localizationFiles = pm.listFiles(ctx, STATS_DIR,
|
||||
null, false, true);
|
||||
for (LocalizationFile lf : localizationFiles) {
|
||||
String name = lf.getName();
|
||||
if (!statConfs.containsKey(name)) {
|
||||
statConfs.put(name, lf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (localizationFiles != null && localizationFiles.length > 0) {
|
||||
configurations.clear();
|
||||
for (LocalizationFile localizationFile : localizationFiles) {
|
||||
if (localizationFile.getFile() != null
|
||||
&& localizationFile.getFile().exists()) {
|
||||
StatisticsConfig config = (StatisticsConfig) jaxbManager
|
||||
.jaxbUnmarshalFromXmlFile(localizationFile
|
||||
.getFile());
|
||||
config = validateAggregates(config);
|
||||
configurations.add(config);
|
||||
if (!statConfs.isEmpty()) {
|
||||
List<StatisticsConfig> myConfigurations = new ArrayList<StatisticsConfig>(
|
||||
statConfs.size());
|
||||
Map<String, StatisticsEvent> myEvents = new HashMap<String, StatisticsEvent>();
|
||||
|
||||
for (LocalizationFile lf : statConfs.values()) {
|
||||
StatisticsConfig config = lf.jaxbUnmarshal(
|
||||
StatisticsConfig.class, jaxbManager);
|
||||
if (config != null) {
|
||||
validate(myEvents, config);
|
||||
if (!config.getEvents().isEmpty()) {
|
||||
myConfigurations.add(config);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
configurations = myConfigurations;
|
||||
classToEventMap = myEvents;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the aggregate if its not a numerical parameter.
|
||||
*
|
||||
* Validates the StatisticsConfig remove invalid
|
||||
* events/aggregates/duplicates.
|
||||
*
|
||||
* @param config
|
||||
*/
|
||||
private StatisticsConfig validateAggregates(StatisticsConfig config)
|
||||
throws ClassNotFoundException {
|
||||
List<StatisticsAggregate> aggregates = new ArrayList<StatisticsAggregate>();
|
||||
for (StatisticsEvent event : config.getEvents()) {
|
||||
Class<?> clazz = Class.forName(event.getType());
|
||||
aggregates = new ArrayList<StatisticsAggregate>();
|
||||
for (StatisticsAggregate aggregate : event.getAggregateList()) {
|
||||
String aggregateField = aggregate.getField();
|
||||
private void validate(Map<String, StatisticsEvent> eventMap,
|
||||
StatisticsConfig config) {
|
||||
for (Iterator<StatisticsEvent> iter = config.getEvents().iterator(); iter
|
||||
.hasNext();) {
|
||||
StatisticsEvent event = iter.next();
|
||||
String eventType = event.getType();
|
||||
if (!eventMap.containsKey(eventType)) {
|
||||
try {
|
||||
Field field = clazz.getDeclaredField(aggregateField);
|
||||
if (!field.getType().isPrimitive()) {
|
||||
statusHandler
|
||||
.info("'"
|
||||
+ aggregateField
|
||||
+ "' not a primitive type. Aggregate being removed. ");
|
||||
}
|
||||
aggregates.add(aggregate);
|
||||
} catch (NoSuchFieldException e) {
|
||||
statusHandler.info("'" + aggregateField
|
||||
+ "' not a valid field. Aggregate being removed. ");
|
||||
}
|
||||
}
|
||||
event.setAggregateList(aggregates);
|
||||
}
|
||||
Class<?> clazz = Class.forName(eventType);
|
||||
// verify the type is an Event
|
||||
clazz.asSubclass(Event.class);
|
||||
|
||||
return config;
|
||||
// validate groupBy fields can be found
|
||||
List<StatisticsGroup> groups = event.getGroupList();
|
||||
if (groups != null) {
|
||||
List<Method> groupByMethods = new ArrayList<Method>(
|
||||
groups.size());
|
||||
Set<String> currentFields = new HashSet<String>();
|
||||
|
||||
for (Iterator<StatisticsGroup> groupIter = groups
|
||||
.iterator(); groupIter.hasNext();) {
|
||||
StatisticsGroup group = groupIter.next();
|
||||
String fieldName = group.getName();
|
||||
if (!currentFields.contains(fieldName)) {
|
||||
try {
|
||||
Method m = ReflectionUtil.getGetterMethod(
|
||||
clazz, fieldName);
|
||||
groupByMethods.add(m);
|
||||
currentFields.add(fieldName);
|
||||
} catch (ReflectionException e) {
|
||||
groupIter.remove();
|
||||
statusHandler
|
||||
.warn("'"
|
||||
+ fieldName
|
||||
+ "' does not have getter method. Group being removed");
|
||||
}
|
||||
} else {
|
||||
statusHandler
|
||||
.warn("'"
|
||||
+ fieldName
|
||||
+ "' already defined. Duplicate group being removed.");
|
||||
}
|
||||
}
|
||||
|
||||
event.setGroupByMethods(groupByMethods);
|
||||
}
|
||||
|
||||
// validate aggregate methods can be found and are primitive
|
||||
List<StatisticsAggregate> curAggregates = event
|
||||
.getAggregateList();
|
||||
List<Method> aggregateMethods = new ArrayList<Method>(
|
||||
curAggregates.size());
|
||||
Set<String> currentFields = new HashSet<String>();
|
||||
|
||||
for (Iterator<StatisticsAggregate> aggrIter = curAggregates
|
||||
.iterator(); aggrIter.hasNext();) {
|
||||
StatisticsAggregate aggregate = aggrIter.next();
|
||||
String aggregateField = aggregate.getField();
|
||||
|
||||
try {
|
||||
Field field = clazz
|
||||
.getDeclaredField(aggregateField);
|
||||
if (field.getType().isPrimitive()) {
|
||||
if (!currentFields.contains(aggregateField)) {
|
||||
try {
|
||||
Method m = ReflectionUtil
|
||||
.getGetterMethod(clazz,
|
||||
aggregateField);
|
||||
aggregateMethods.add(m);
|
||||
currentFields.add(aggregateField);
|
||||
} catch (ReflectionException e) {
|
||||
aggrIter.remove();
|
||||
statusHandler
|
||||
.warn("'"
|
||||
+ aggregateField
|
||||
+ "' does not have getter method. Aggregate being removed");
|
||||
}
|
||||
} else {
|
||||
aggrIter.remove();
|
||||
statusHandler
|
||||
.warn("'"
|
||||
+ aggregateField
|
||||
+ "' already defined. Duplicate aggregate being removed. ");
|
||||
}
|
||||
} else {
|
||||
aggrIter.remove();
|
||||
statusHandler
|
||||
.warn("'"
|
||||
+ aggregateField
|
||||
+ "' not a primitive type. Aggregate being removed. ");
|
||||
}
|
||||
} catch (NoSuchFieldException e) {
|
||||
aggrIter.remove();
|
||||
statusHandler
|
||||
.warn("'"
|
||||
+ aggregateField
|
||||
+ "' not a valid field. Aggregate being removed. ");
|
||||
}
|
||||
}
|
||||
|
||||
if (!curAggregates.isEmpty()) {
|
||||
event.setAggregateMethods(aggregateMethods);
|
||||
eventMap.put(eventType, event);
|
||||
} else {
|
||||
iter.remove();
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
iter.remove();
|
||||
statusHandler.warn("'" + eventType
|
||||
+ "' not a valid type. Type being removed. ");
|
||||
} catch (ClassCastException e) {
|
||||
iter.remove();
|
||||
statusHandler.warn("'" + eventType
|
||||
+ "' not an Event type. Type being removed. ");
|
||||
}
|
||||
} else {
|
||||
iter.remove();
|
||||
statusHandler
|
||||
.warn("'"
|
||||
+ eventType
|
||||
+ "' is already defined. StatisticsEvent being skipped.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue