Merge "Issue #1340 Updates to statistics for common baseline, data delivery stats gathering" into development

Former-commit-id: 9efc70bc0c [formerly 4916961d3e] [formerly 5a2ba01a49] [formerly b48635490a [formerly 5a2ba01a49 [formerly 6176f3ef90779a5b99c99b1cfd8f0ffc60a6f316]]]
Former-commit-id: b48635490a
Former-commit-id: 0ff709d8e6f762cc5f0efeaf25d94046cd0e4d1e [formerly ffd6fd3e5e]
Former-commit-id: cb99915ea1
This commit is contained in:
Richard Peter 2012-11-20 10:25:57 -06:00 committed by Gerrit Code Review
commit 02a45a5f93
15 changed files with 881 additions and 763 deletions

View file

@ -11,4 +11,5 @@ Require-Bundle: com.raytheon.uf.common.time;bundle-version="1.12.1174",
com.raytheon.uf.common.serialization;bundle-version="1.12.1174",
com.raytheon.uf.common.serialization.comm;bundle-version="1.12.1174",
javax.persistence;bundle-version="1.0.0",
com.raytheon.uf.common.dataplugin;bundle-version="1.12.1174"
com.raytheon.uf.common.dataplugin;bundle-version="1.12.1174",
com.raytheon.uf.common.event;bundle-version="1.0.0"

View file

@ -44,6 +44,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Initial creation
* Nov 12, 2012 dhladky Updates some things for stats
*
* </pre>
*
@ -57,6 +58,8 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@DynamicSerialize
public class AggregateRecord extends PersistableDataObject {
private static final long serialVersionUID = -4553588456131256014L;
@GeneratedValue(strategy = GenerationType.AUTO)
@Id
@DynamicSerializeElement
@ -102,88 +105,88 @@ public class AggregateRecord extends PersistableDataObject {
this.eventType = eventType;
this.startDate = startDate;
this.endDate = endDate;
this.grouping = groupings;
grouping = groupings;
this.field = field;
}
public Calendar getStartDate() {
return startDate;
}
public void setStartDate(Calendar startDate) {
this.startDate = startDate;
}
public Calendar getEndDate() {
return endDate;
}
public void setEndDate(Calendar endDate) {
this.endDate = endDate;
}
public String getEventType() {
return eventType;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public String getGrouping() {
return grouping;
}
public void setGrouping(String grouping) {
this.grouping = grouping;
}
public String getField() {
return field;
}
public void setField(String field) {
this.field = field;
}
public double getMax() {
return max;
}
public void setMax(double max) {
this.max = max;
}
public double getMin() {
return min;
}
public void setMin(double min) {
this.min = min;
}
public double getSum() {
return sum;
}
public void setSum(double sum) {
this.sum = sum;
}
public double getCount() {
return count;
}
public void setCount(double count) {
this.count = count;
public Calendar getEndDate() {
return endDate;
}
public String getEventType() {
return eventType;
}
public String getField() {
return field;
}
public String getGrouping() {
return grouping;
}
public Integer getId() {
return id;
}
public double getMax() {
return max;
}
public double getMin() {
return min;
}
public Calendar getStartDate() {
return startDate;
}
public double getSum() {
return sum;
}
public void setCount(double count) {
this.count = count;
}
public void setEndDate(Calendar endDate) {
this.endDate = endDate;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public void setField(String field) {
this.field = field;
}
public void setGrouping(String grouping) {
this.grouping = grouping;
}
public void setId(Integer id) {
this.id = id;
}
public void setMax(double max) {
this.max = max;
}
public void setMin(double min) {
this.min = min;
}
public void setStartDate(Calendar startDate) {
this.startDate = startDate;
}
public void setSum(double sum) {
this.sum = sum;
}
}

View file

@ -1,25 +1,27 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.event;
package com.raytheon.uf.common.stats;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@ -41,10 +43,18 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* @version 1.0
*/
@DynamicSerialize
public class ProcessEvent extends Event {
public class ProcessEvent extends StatisticsEvent {
private static final long serialVersionUID = 1L;
private static final Map<String, String> FIELD_UNIT_MAP;
static {
Map<String, String> m = new HashMap<String, String>();
m.put("processingLatency", "ms");
m.put("processingTime", "ms");
FIELD_UNIT_MAP = Collections.unmodifiableMap(m);
}
@DynamicSerializeElement
private String message;
@ -70,8 +80,15 @@ public class ProcessEvent extends Event {
}
@Override
public String toString() {
return super.toString() + " : " + getMessage();
protected Map<String, String> getFieldUnitMap() {
return FIELD_UNIT_MAP;
}
/**
* @return the fileName
*/
public String getFileName() {
return fileName;
}
/**
@ -81,14 +98,6 @@ public class ProcessEvent extends Event {
return message;
}
/**
* @param message
* the message to set
*/
public void setMessage(String message) {
this.message = message;
}
/**
* @return the pluginName
*/
@ -97,18 +106,17 @@ public class ProcessEvent extends Event {
}
/**
* @param pluginName
* the pluginName to set
* @return the processingLatency in milliseconds
*/
public void setPluginName(String pluginName) {
this.pluginName = pluginName;
public long getProcessingLatency() {
return processingLatency;
}
/**
* @return the fileName
* @return the processingTime in milliseconds
*/
public String getFileName() {
return fileName;
public long getProcessingTime() {
return processingTime;
}
/**
@ -120,25 +128,19 @@ public class ProcessEvent extends Event {
}
/**
* @return the processingTime in milliseconds
* @param message
* the message to set
*/
public long getProcessingTime() {
return processingTime;
public void setMessage(String message) {
this.message = message;
}
/**
* @param processingTime
* the processingTime in milliseconds to set
* @param pluginName
* the pluginName to set
*/
public void setProcessingTime(long processingTime) {
this.processingTime = processingTime;
}
/**
* @return the processingLatency in milliseconds
*/
public long getProcessingLatency() {
return processingLatency;
public void setPluginName(String pluginName) {
this.pluginName = pluginName;
}
/**
@ -149,4 +151,17 @@ public class ProcessEvent extends Event {
this.processingLatency = processingLatency;
}
}
/**
* @param processingTime
* the processingTime in milliseconds to set
*/
public void setProcessingTime(long processingTime) {
this.processingTime = processingTime;
}
@Override
public String toString() {
return super.toString() + " : " + getMessage();
}
}

View file

@ -0,0 +1,58 @@
package com.raytheon.uf.common.stats;
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
import java.util.Map;
import java.util.Set;
import com.raytheon.uf.common.event.Event;
/**
* Event used for statistics
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 25, 2012 #1340 dhladky Initial creation
*
* </pre>
*
* @author dhladky
* @version 1.0
*/
public abstract class StatisticsEvent extends Event {
private static final long serialVersionUID = 1L;
public Set<String> getFields() {
return getFieldUnitMap().keySet();
}
protected abstract Map<String, String> getFieldUnitMap();
public String getStorageUnit(String field) {
return getFieldUnitMap().get(field);
}
}

View file

@ -73,36 +73,47 @@ public class StatsRecord extends PersistableDataObject {
@DynamicSerializeElement
private byte[] event;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Calendar getDate() {
return date;
}
public void setDate(Calendar date) {
this.date = date;
}
public String getEventType() {
return eventType;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public byte[] getEvent() {
return event;
}
public String getEventType() {
return eventType;
}
public Integer getId() {
return id;
}
public void setDate(Calendar date) {
this.date = date;
}
public void setEvent(byte[] event) {
this.event = event;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public void setId(Integer id) {
this.id = id;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getEventType() + " ");
sb.append(getDate() + " ");
sb.append(getId() + " ");
return sb.toString();
}
}

View file

@ -33,17 +33,17 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
/**
* Statistics Configuration Event xml element.
*
*
* <pre>
*
*
* SOFTWARE HISTORY
*
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 6, 2012 728 mpduff Initial creation.
*
*
* </pre>
*
*
* @author mpduff
* @version 1.0
*/
@ -72,48 +72,6 @@ public class StatisticsEvent {
@DynamicSerializeElement
private List<StatisticsAggregate> aggregateList;
/**
* @return the type
*/
public String getType() {
return type;
}
/**
* @param type the type to set
*/
public void setType(String type) {
this.type = type;
}
/**
* @return the displayName
*/
public String getDisplayName() {
return displayName;
}
/**
* @param displayName the displayName to set
*/
public void setDisplayName(String displayName) {
this.displayName = displayName;
}
/**
* @return the groupList
*/
public List<StatisticsGroup> getGroupList() {
return groupList;
}
/**
* @param groupList the groupList to set
*/
public void setGroupList(List<StatisticsGroup> groupList) {
this.groupList = groupList;
}
/**
* @return the aggregateList
*/
@ -121,13 +79,6 @@ public class StatisticsEvent {
return aggregateList;
}
/**
* @param aggregateList the aggregateList to set
*/
public void setAggregateList(List<StatisticsAggregate> aggregateList) {
this.aggregateList = aggregateList;
}
/**
* @return the category
*/
@ -136,9 +87,63 @@ public class StatisticsEvent {
}
/**
* @param category the category to set
* @return the displayName
*/
public String getDisplayName() {
return displayName;
}
/**
* @return the groupList
*/
public List<StatisticsGroup> getGroupList() {
return groupList;
}
/**
* @return the type
*/
public String getType() {
return type;
}
/**
* @param aggregateList
* the aggregateList to set
*/
public void setAggregateList(List<StatisticsAggregate> aggregateList) {
this.aggregateList = aggregateList;
}
/**
* @param category
* the category to set
*/
public void setCategory(String category) {
this.category = category;
}
/**
* @param displayName
* the displayName to set
*/
public void setDisplayName(String displayName) {
this.displayName = displayName;
}
/**
* @param groupList
* the groupList to set
*/
public void setGroupList(List<StatisticsGroup> groupList) {
this.groupList = groupList;
}
/**
* @param type
* the type to set
*/
public void setType(String type) {
this.type = type;
}
}

View file

@ -15,7 +15,8 @@ Require-Bundle: org.apache.camel;bundle-version="1.0.0",
com.raytheon.uf.common.util,
com.raytheon.uf.edex.core,
org.apache.commons.logging,
javax.servlet
javax.servlet,
com.raytheon.uf.common.stats;bundle-version="1.0.0"
Export-Package: com.raytheon.uf.edex.esb.camel,
com.raytheon.uf.edex.esb.camel.directvm
Import-Package: com.raytheon.uf.common.event,

View file

@ -29,7 +29,7 @@ import org.apache.camel.Header;
import org.apache.camel.Headers;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.event.ProcessEvent;
import com.raytheon.uf.common.stats.ProcessEvent;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
@ -67,6 +67,41 @@ public class ProcessUtil {
};
/**
* Get the value of a specified property if it exists.
*
* @param <T>
* Type of the receiving variable.
* @param headers
* Map of header properties.
* @param propertyName
* Name of the property to get.
* @return Value of the requested property. Null if the property does not
* exist.
*/
@SuppressWarnings("unchecked")
private static <T> T getHeaderProperty(Map<?, ?> headers,
Object propertyName) {
Object o = headers.get(propertyName);
T result = null;
if (o != null) {
result = (T) o;
}
return result;
}
/**
* Return an incoming array as an Iterator to the elements.
*
* @param objects
* @return
*/
public static Iterator<?> iterate(PluginDataObject[] objects) {
return Arrays.asList(objects).iterator();
}
public void delete(@Header(value = "ingestFileName") String path) {
File f = new File(path);
if (f.exists()) {
@ -108,7 +143,7 @@ public class ProcessUtil {
DecimalFormat df = FORMAT.get();
if (dequeueTime != null) {
long elapsedMilliseconds = curTime - dequeueTime;
double elapsed = (elapsedMilliseconds) / 1000.0;
double elapsed = elapsedMilliseconds / 1000.0;
sb.append(" processed in: ");
sb.append(df.format(elapsed));
sb.append(" (sec)");
@ -118,7 +153,7 @@ public class ProcessUtil {
Long enqueueTime = getHeaderProperty(headers, "enqueueTime");
if (enqueueTime != null) {
long latencyMilliseconds = curTime - enqueueTime;
double latency = (latencyMilliseconds) / 1000.0;
double latency = latencyMilliseconds / 1000.0;
sb.append(" Latency: ");
sb.append(df.format(latency));
sb.append(" (sec)");
@ -134,39 +169,4 @@ public class ProcessUtil {
handler.handle(Priority.INFO, "No logging information available");
}
}
/**
* Return an incoming array as an Iterator to the elements.
*
* @param objects
* @return
*/
public static Iterator<?> iterate(PluginDataObject[] objects) {
return Arrays.asList(objects).iterator();
}
/**
* Get the value of a specified property if it exists.
*
* @param <T>
* Type of the receiving variable.
* @param headers
* Map of header properties.
* @param propertyName
* Name of the property to get.
* @return Value of the requested property. Null if the property does not
* exist.
*/
@SuppressWarnings("unchecked")
private static <T> T getHeaderProperty(Map<?, ?> headers,
Object propertyName) {
Object o = headers.get(propertyName);
T result = null;
if (o != null) {
result = (T) o;
}
return result;
}
}

View file

@ -16,7 +16,7 @@
<constructor-arg value="${stats.scanInterval}"/>
</bean>
<bean id="edexStatsRegistered" factory-bean="contextManager"
<bean id="edexStatsRegistered" factory-bean="clusteredCamelContextMgr"
factory-method="register" depends-on="persistCamelRegistered">
<constructor-arg ref="edexStats-camel"/>
</bean>

View file

@ -50,27 +50,26 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
/**
* Aggregates stat records based on the statsConfig files and stores them after
* a configured period.
*
*
* *
*
*
* <pre>
*
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Stored the aggregate buckets in the db.
* Nov 07, 2012 1317 mpduff Updated Configuration Files.
*
*
* </pre>
*
*
* @author jsanchez
*
*
*/
public class AggregateManager {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(AggregateManager.class);
private class TimeRangeKey extends TimeRange {
private static final long serialVersionUID = 4603487307433273159L;
public TimeRangeKey(Calendar cal1, Calendar cal2) {
super(cal1, cal2);
}
@ -80,8 +79,8 @@ public class AggregateManager {
if (o != null && o instanceof TimeRange) {
TimeRange other = (TimeRange) o;
return (this.getStart().equals(other.getStart()) && this
.getEnd().equals(other.getEnd()));
return getStart().equals(other.getStart())
&& getEnd().equals(other.getEnd());
}
return false;
@ -98,6 +97,9 @@ public class AggregateManager {
}
}
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(AggregateManager.class);
/** In minutes */
private int bucketInterval;
@ -118,16 +120,332 @@ public class AggregateManager {
public AggregateManager(String bucketInterval, String scanInterval)
throws Exception {
this.configLoader = new ConfigLoader();
configLoader = new ConfigLoader();
validateIntervals(bucketInterval, scanInterval);
configLoader.load();
StatsHandler.setValidEventTypes(configLoader.getConfigurations());
}
/**
* Performs the aggregation based on the statsConfig file.
*
* @param key
* @param data
*/
private void aggregate(StatisticsEvent statsEvent, TimeRange timeRange,
List<Event> data) {
Calendar start = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
start.setTime(timeRange.getStart());
Calendar end = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
end.setTime(timeRange.getEnd());
// collect grouping names from stats config
List<String> groupByColumns = new ArrayList<String>();
for (StatisticsGroup groupBy : statsEvent.getGroupList()) {
String column = groupBy.getName();
groupByColumns.add(column);
}
// breaks data into groups
Map<String, List<Event>> map = divideIntoGroups(data, groupByColumns);
// perform aggregate functions on the grouped data
for (String groupKey : map.keySet()) {
List<Event> groupData = map.get(groupKey);
for (StatisticsAggregate aggregate : statsEvent.getAggregateList()) {
String field = aggregate.getField();
try {
double[] values = new double[groupData.size()];
String methodName = getterMethodName(field);
for (int i = 0; i < groupData.size(); i++) {
Object obj = groupData.get(i);
Class<?> clazz = obj.getClass();
Method m = clazz.getMethod(methodName, new Class<?>[0]);
Number number = (Number) m.invoke(obj, new Object[0]);
values[i] = number.doubleValue();
}
double count = values.length;
double max = 0;
double min = Double.MAX_VALUE;
double sum = 0;
for (double value : values) {
sum += value;
if (value > max) {
max = value;
}
if (value < min) {
min = value;
}
}
AggregateRecord record = new AggregateRecord(
statsEvent.getType(), start, end, groupKey, field);
record.setSum(sum);
record.setMin(min);
record.setMax(max);
record.setCount(count);
aggregateRecordDao.persist(record);
} catch (Exception e) {
statusHandler.error("Unable to aggregate '" + field + "'",
e);
}
}
}
}
/**
* Creates a time range from a date and the bucket interval. The time range
* start time that will be the date rounded to the next bucket interval. The
* time range end time will be the start time plus the bucket interval.
*
* @param date
* @return
*/
private TimeRangeKey createTimeRangeKey(Calendar date) {
Calendar start = getBucketStartTime(date);
Calendar end = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
end.setTimeInMillis(start.getTimeInMillis());
end.add(Calendar.MINUTE, bucketInterval);
TimeRangeKey timeRangeKey = new TimeRangeKey(start, end);
return timeRangeKey;
}
/**
* Breaks the list of data into groups based on groupByColumns. The key is a
* concatenation of the column values (i.e. datatype.username). This method
* can group data to n-number of levels.
*
* @param data
* @param groupByColumns
* @return
*/
private Map<String, List<Event>> divideIntoGroups(List<Event> data,
List<String> groupByColumns) {
Map<String, List<Event>> map = new HashMap<String, List<Event>>();
map.put("", data);
for (String column : groupByColumns) {
List<Map<String, List<Event>>> listOfMaps = new ArrayList<Map<String, List<Event>>>();
for (String parent : map.keySet()) {
List<Event> list = map.get(parent);
listOfMaps.add(group(list, column, parent));
}
map.clear();
// replace map with grouped data
for (Map<String, List<Event>> m : listOfMaps) {
for (String k : m.keySet()) {
map.put(k, m.get(k));
}
}
}
return map;
}
/**
* Extracts the events from the stats records.
*
* @param records
* @return
*/
private List<Event> extractEvents(List<StatsRecord> records) {
List<Event> eventsList = new ArrayList<Event>(records.size());
for (StatsRecord record : records) {
try {
Event event = SerializationUtil.transformFromThrift(
Event.class, record.getEvent());
eventsList.add(event);
} catch (SerializationException e) {
statusHandler
.error("Error trying to transform event. Aggregation may be inaccurate. ",
e);
}
}
return eventsList;
}
/**
* Calculates the start time that will be the date rounded to the next
* bucket interval
*
* @param date
* @return
*/
private Calendar getBucketStartTime(Calendar date) {
int currentMinutes = date.get(Calendar.MINUTE);
int incrementsWithinHour = bucketInterval;
// checks if period is larger than 60 minutes
if (bucketInterval > 60) {
incrementsWithinHour = bucketInterval % 60;
}
int mod = currentMinutes % incrementsWithinHour;
Calendar start = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
start.setTimeInMillis(date.getTimeInMillis());
start.add(Calendar.MINUTE, -mod);
start.set(Calendar.SECOND, 0);
start.set(Calendar.MILLISECOND, 0);
return start;
}
/**
* Returns the name of the getter method for the parameter
*
* @param parameter
* @return
*/
private String getterMethodName(String parameter) {
return "get" + parameter.substring(0, 1).toUpperCase()
+ parameter.substring(1);
}
/**
* Helper method to group data to one level.
*
* @param data
* @param column
* @param parent
* @return
*/
private Map<String, List<Event>> group(List<Event> data, String column,
String parent) {
Map<String, List<Event>> map = new HashMap<String, List<Event>>();
String methodName = getterMethodName(column);
for (Event rec : data) {
try {
Class<?> clazz = rec.getClass();
Method m = clazz.getMethod(methodName, new Class<?>[0]);
String value = column + ":"
+ String.valueOf(m.invoke(rec, new Object[0]));
if (parent.length() > 0) {
value = parent + "-" + value;
}
List<Event> list = map.get(value);
if (list == null) {
list = new ArrayList<Event>();
}
list.add(rec);
map.put(value, list);
} catch (Exception e) {
statusHandler.error("Error creating groups", e);
}
}
return map;
}
/**
* Retrieve StatRecords from the metadata.event.stats table. This method
* does not retrieve records of the current bucket.
*/
private void retrieveStatRecords(StatsDao statsRecordDao,
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets)
throws Exception {
Calendar current = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
for (StatisticsConfig statsConfig : configLoader.getConfigurations()) {
for (StatisticsEvent event : statsConfig.getEvents()) {
String eventType = event.getType();
// Does not retrieve stat records of current bucket.
// this method should always return a valid array.
StatsRecord[] records = statsRecordDao.retrieveRecords(
getBucketStartTime(current), eventType);
sort(eventType, records, aggregateBuckets);
}
}
}
/**
* Scans the stats table to be stored in buckets and aggregate if necessary.
*/
public void scan() throws Exception {
StatsDao statsRecordDao = new StatsDao();
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets = new HashMap<String, Map<TimeRangeKey, List<StatsRecord>>>();
// retrieves records and sorts in buckets
retrieveStatRecords(statsRecordDao, aggregateBuckets);
// loops through map to aggregate buckets
for (StatisticsConfig statsConfig : configLoader.getConfigurations()) {
for (StatisticsEvent event : statsConfig.getEvents()) {
String eventType = event.getType();
Map<TimeRangeKey, List<StatsRecord>> map = aggregateBuckets
.get(eventType);
// map should never be null, since it will be set in the 'sort'
// method.
for (Iterator<Map.Entry<TimeRangeKey, List<StatsRecord>>> iter = map
.entrySet().iterator(); iter.hasNext();) {
Entry<TimeRangeKey, List<StatsRecord>> element = iter
.next();
TimeRangeKey tr = element.getKey();
List<StatsRecord> records = element.getValue();
if (!records.isEmpty()) {
List<Event> data = extractEvents(records);
aggregate(event, tr, data);
try {
statsRecordDao.deleteAll(records);
} catch (Exception e) {
statusHandler.error("Error deleting stat records",
e);
}
}
iter.remove();
}
}
}
}
/**
* Stores the results into proper aggregate buckets. This method assumes
* that the records are in date order.
*
* @param events
*/
private void sort(String eventType, StatsRecord[] records,
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets)
throws Exception {
Map<TimeRangeKey, List<StatsRecord>> map = aggregateBuckets
.get(eventType);
if (map == null) {
map = new HashMap<TimeRangeKey, List<StatsRecord>>();
aggregateBuckets.put(eventType, map);
}
TimeRangeKey timeRange = null;
for (StatsRecord record : records) {
if (timeRange == null
|| !timeRange.contains(record.getDate().getTime())) {
// Create bucket based on stats record date
timeRange = createTimeRangeKey(record.getDate());
}
List<StatsRecord> bucketList = map.get(timeRange);
if (bucketList == null) {
bucketList = new ArrayList<StatsRecord>();
map.put(timeRange, bucketList);
}
bucketList.add(record);
}
}
/**
* Tests if the bucket interval and the scan interval are valid values. If
* values are invalid then values will be set to default values.
*
*
* @param bucketInt
* @param scanInt
* @return
@ -175,320 +493,4 @@ public class AggregateManager {
}
}
/**
* Scans the stats table to be stored in buckets and aggregate if necessary.
*/
public void scan() throws Exception {
StatsDao statsRecordDao = new StatsDao();
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets = new HashMap<String, Map<TimeRangeKey, List<StatsRecord>>>();
// retrieves records and sorts in buckets
retrieveStatRecords(statsRecordDao, aggregateBuckets);
// loops through map to aggregate buckets
for (StatisticsConfig statsConfig : configLoader.getConfigurations()) {
for (StatisticsEvent event : statsConfig.getEvents()) {
String eventType = event.getType();
Map<TimeRangeKey, List<StatsRecord>> map = aggregateBuckets
.get(eventType);
// map should never be null, since it will be set in the 'sort'
// method.
for (Iterator<Map.Entry<TimeRangeKey, List<StatsRecord>>> iter = map
.entrySet().iterator(); iter.hasNext();) {
Entry<TimeRangeKey, List<StatsRecord>> element = iter
.next();
TimeRangeKey tr = element.getKey();
List<StatsRecord> records = element.getValue();
if (!records.isEmpty()) {
List<Event> data = extractEvents(records);
aggregate(event, tr, data);
try {
statsRecordDao.deleteAll(records);
} catch (Exception e) {
statusHandler.error("Error deleting stat records",
e);
}
}
iter.remove();
}
}
}
}
/**
* Retrieve StatRecords from the metadata.event.stats table. This method
* does not retrieve records of the current bucket.
*/
private void retrieveStatRecords(StatsDao statsRecordDao,
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets)
throws Exception {
Calendar current = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
for (StatisticsConfig statsConfig : configLoader.getConfigurations()) {
for (StatisticsEvent event : statsConfig.getEvents()) {
String eventType = event.getType();
// Does not retrieve stat records of current bucket.
// this method should always return a valid array.
StatsRecord[] records = statsRecordDao.retrieveRecords(
getBucketStartTime(current), eventType);
sort(eventType, records, aggregateBuckets);
}
}
}
/**
* Stores the results into proper aggregate buckets. This method assumes
* that the records are in date order.
*
* @param events
*/
private void sort(String eventType, StatsRecord[] records,
Map<String, Map<TimeRangeKey, List<StatsRecord>>> aggregateBuckets)
throws Exception {
Map<TimeRangeKey, List<StatsRecord>> map = aggregateBuckets
.get(eventType);
if (map == null) {
map = new HashMap<TimeRangeKey, List<StatsRecord>>();
aggregateBuckets.put(eventType, map);
}
TimeRangeKey timeRange = null;
for (StatsRecord record : records) {
if (timeRange == null
|| !timeRange.contains(record.getDate().getTime())) {
// Create bucket based on stats record date
timeRange = createTimeRangeKey(record.getDate());
}
List<StatsRecord> bucketList = map.get(timeRange);
if (bucketList == null) {
bucketList = new ArrayList<StatsRecord>();
map.put(timeRange, bucketList);
}
bucketList.add(record);
}
}
/**
* Creates a time range from a date and the bucket interval. The time range
* start time that will be the date rounded to the next bucket interval. The
* time range end time will be the start time plus the bucket interval.
*
* @param date
* @return
*/
private TimeRangeKey createTimeRangeKey(Calendar date) {
Calendar start = getBucketStartTime(date);
Calendar end = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
end.setTimeInMillis(start.getTimeInMillis());
end.add(Calendar.MINUTE, bucketInterval);
TimeRangeKey timeRangeKey = new TimeRangeKey(start, end);
return timeRangeKey;
}
/**
* Calculates the start time that will be the date rounded to the next
* bucket interval
*
* @param date
* @return
*/
private Calendar getBucketStartTime(Calendar date) {
int currentMinutes = date.get(Calendar.MINUTE);
int incrementsWithinHour = bucketInterval;
// checks if period is larger than 60 minutes
if (bucketInterval > 60) {
incrementsWithinHour = bucketInterval % 60;
}
int mod = currentMinutes % incrementsWithinHour;
Calendar start = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
start.setTimeInMillis(date.getTimeInMillis());
start.add(Calendar.MINUTE, -mod);
start.set(Calendar.SECOND, 0);
start.set(Calendar.MILLISECOND, 0);
return start;
}
/**
* Extracts the events from the stats records.
*
* @param records
* @return
*/
private List<Event> extractEvents(List<StatsRecord> records) {
List<Event> eventsList = new ArrayList<Event>(records.size());
for (StatsRecord record : records) {
try {
Event event = (Event) SerializationUtil
.transformFromThrift(record.getEvent());
eventsList.add(event);
} catch (SerializationException e) {
statusHandler
.error("Error trying to transform event. Aggregation may be inaccurate. ",
e);
}
}
return eventsList;
}
/**
* Performs the aggregation based on the statsConfig file.
*
* @param key
* @param data
*/
private void aggregate(StatisticsEvent statsEvent, TimeRange timeRange,
List<Event> data) {
Calendar start = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
start.setTime(timeRange.getStart());
Calendar end = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
end.setTime(timeRange.getEnd());
// collect grouping names from stats config
List<String> groupByColumns = new ArrayList<String>();
for (StatisticsGroup groupBy : statsEvent.getGroupList()) {
String column = groupBy.getName();
groupByColumns.add(column);
}
// breaks data into groups
Map<String, List<Event>> map = divideIntoGroups(data, groupByColumns);
// perform aggregate functions on the grouped data
for (String groupKey : map.keySet()) {
List<Event> groupData = map.get(groupKey);
for (StatisticsAggregate aggregate : statsEvent.getAggregateList()) {
String field = aggregate.getField();
try {
double[] values = new double[groupData.size()];
String methodName = getterMethodName(field);
for (int i = 0; i < groupData.size(); i++) {
Object obj = groupData.get(i);
Class<?> clazz = obj.getClass();
Method m = clazz.getMethod(methodName, new Class<?>[0]);
Number number = (Number) m.invoke(obj, new Object[0]);
values[i] = number.doubleValue();
}
double count = values.length;
double max = 0;
double min = Double.MAX_VALUE;
double sum = 0;
for (int i = 0; i < values.length; i++) {
sum += values[i];
if (values[i] > max) {
max = values[i];
}
if (values[i] < min) {
min = values[i];
}
}
AggregateRecord record = new AggregateRecord(
statsEvent.getType(), start, end, groupKey, field);
record.setSum(sum);
record.setMin(min);
record.setMax(max);
record.setCount(count);
aggregateRecordDao.persist(record);
} catch (Exception e) {
statusHandler.error("Unable to aggregate '" + field + "'",
e);
}
}
}
}
/**
* Breaks the list of data into groups based on groupByColumns. The key is a
* concatenation of the column values (i.e. datatype.username). This method
* can group data to n-number of levels.
*
* @param data
* @param groupByColumns
* @return
*/
private Map<String, List<Event>> divideIntoGroups(List<Event> data,
List<String> groupByColumns) {
Map<String, List<Event>> map = new HashMap<String, List<Event>>();
map.put("", data);
for (String column : groupByColumns) {
List<Map<String, List<Event>>> listOfMaps = new ArrayList<Map<String, List<Event>>>();
for (String parent : map.keySet()) {
List<Event> list = map.get(parent);
listOfMaps.add(group(list, column, parent));
}
map.clear();
// replace map with grouped data
for (Map<String, List<Event>> m : listOfMaps) {
for (String k : m.keySet()) {
map.put(k, m.get(k));
}
}
}
return map;
}
/**
* Helper method to group data to one level.
*
* @param data
* @param column
* @param parent
* @return
*/
private Map<String, List<Event>> group(List<Event> data, String column,
String parent) {
Map<String, List<Event>> map = new HashMap<String, List<Event>>();
String methodName = getterMethodName(column);
for (Event rec : data) {
try {
Class<?> clazz = rec.getClass();
Method m = clazz.getMethod(methodName, new Class<?>[0]);
String value = column + ":"
+ String.valueOf(m.invoke(rec, new Object[0]));
if (parent.length() > 0) {
value = parent + "-" + value;
}
List<Event> list = map.get(value);
if (list == null) {
list = new ArrayList<Event>();
}
list.add(rec);
map.put(value, list);
} catch (Exception e) {
statusHandler.error("Error creating groups", e);
}
}
return map;
}
/**
* Returns the name of the getter method for the parameter
*
* @param parameter
* @return
*/
private String getterMethodName(String parameter) {
return "get" + parameter.substring(0, 1).toUpperCase()
+ parameter.substring(1);
}
}

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.edex.stats;
import java.io.File;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
@ -67,15 +68,15 @@ public class StatsPurge {
private Archiver archiver;
private CoreDao aggregateRecordDao = new CoreDao(DaoConfig.forClass(
private final CoreDao aggregateRecordDao = new CoreDao(DaoConfig.forClass(
"metadata", AggregateRecord.class));
private CoreDao statsRecordDao = new CoreDao(DaoConfig.forClass("metadata",
StatsRecord.class));
private final CoreDao statsRecordDao = new CoreDao(DaoConfig.forClass(
"metadata", StatsRecord.class));
private PurgeRuleSet aggregatePurgeRules;
private final PurgeRuleSet aggregatePurgeRules;
private PurgeRuleSet statsPurgeRules;
private final PurgeRuleSet statsPurgeRules;
public StatsPurge() {
aggregatePurgeRules = readPurgeRules("aggregatePurgeRules.xml");
@ -83,10 +84,6 @@ public class StatsPurge {
try {
archiver = new Archiver();
purgeStats();
} catch (JAXBException e) {
statusHandler
.error("Error starting up archiver. Aggregates will not be archived. ",
e);
} catch (DataAccessLayerException e) {
statusHandler
.error("Error purging stats on start up. Stats will not be purged. ",
@ -95,30 +92,48 @@ public class StatsPurge {
}
/**
* Reads the purge files.
* Purges records from the aggregate table and writes them to disk.
*/
private PurgeRuleSet readPurgeRules(String xml) {
PurgeRuleSet purgeRules = null;
try {
File file = PathManagerFactory.getPathManager().getStaticFile(
"purge/" + xml);
if (file != null) {
try {
purgeRules = (PurgeRuleSet) SerializationUtil
.jaxbUnmarshalFromXmlFile(file);
} catch (SerializationException e) {
statusHandler.error("Error deserializing purge rule " + xml
+ "!");
}
public void purgeAggregates() throws JAXBException,
DataAccessLayerException {
if (aggregatePurgeRules != null) {
Calendar expiration = Calendar.getInstance(TimeZone
.getTimeZone("GMT"));
DatabaseQuery query = new DatabaseQuery(AggregateRecord.class);
List<PurgeRule> allRules = new ArrayList<PurgeRule>();
} else {
statusHandler.error(xml
+ " rule not defined!! Data will not be purged.");
// check for specific rules, if none, apply defaults
if (!aggregatePurgeRules.getRules().isEmpty()) {
allRules.addAll(aggregatePurgeRules.getRules());
} else if (!aggregatePurgeRules.getDefaultRules().isEmpty()) {
allRules.addAll(aggregatePurgeRules.getDefaultRules());
}
for (PurgeRule rule : allRules) {
if (rule.isPeriodSpecified()) {
long ms = rule.getPeriodInMillis();
int minutes = new Long(ms / (1000 * 60)).intValue();
expiration.add(Calendar.MINUTE, -minutes);
query.addQueryParam("endDate", expiration,
QueryOperand.LESSTHAN);
List<?> objects = aggregateRecordDao.queryByCriteria(query);
if (!objects.isEmpty()) {
AggregateRecord[] aggregateRecords = new AggregateRecord[objects
.size()];
for (int i = 0; i < aggregateRecords.length; i++) {
aggregateRecords[i] = (AggregateRecord) objects
.get(i);
}
archiver.writeToDisk(aggregateRecords);
aggregateRecordDao.deleteAll(objects);
}
}
}
} catch (Exception e) {
statusHandler.error("Error reading purge file " + xml, e);
}
return purgeRules;
}
/**
@ -145,36 +160,30 @@ public class StatsPurge {
}
/**
* Purges records from the aggregate table and writes them to disk.
* Reads the purge files.
*/
public void purgeAggregates() throws JAXBException,
DataAccessLayerException {
if (aggregatePurgeRules != null) {
Calendar expiration = Calendar.getInstance(TimeZone
.getTimeZone("GMT"));
DatabaseQuery query = new DatabaseQuery(AggregateRecord.class);
private PurgeRuleSet readPurgeRules(String xml) {
PurgeRuleSet purgeRules = null;
try {
File file = PathManagerFactory.getPathManager().getStaticFile(
"purge/" + xml);
if (file != null) {
try {
purgeRules = SerializationUtil.jaxbUnmarshalFromXmlFile(
PurgeRuleSet.class, file);
for (PurgeRule rule : aggregatePurgeRules.getRules()) {
if (rule.isPeriodSpecified()) {
long ms = rule.getPeriodInMillis();
int minutes = new Long(ms / (1000 * 60)).intValue();
expiration.add(Calendar.MINUTE, -minutes);
query.addQueryParam("endDate", expiration,
QueryOperand.LESSTHAN);
List<?> objects = aggregateRecordDao.queryByCriteria(query);
if (!objects.isEmpty()) {
AggregateRecord[] aggregateRecords = new AggregateRecord[objects
.size()];
for (int i = 0; i < aggregateRecords.length; i++) {
aggregateRecords[i] = (AggregateRecord) objects
.get(i);
}
archiver.writeToDisk(aggregateRecords);
aggregateRecordDao.deleteAll(objects);
}
} catch (SerializationException e) {
statusHandler.error("Error deserializing purge rule " + xml
+ "!");
}
} else {
statusHandler.error(xml
+ " rule not defined!! Data will not be purged.");
}
} catch (Exception e) {
statusHandler.error("Error reading purge file " + xml, e);
}
return purgeRules;
}
}

View file

@ -39,20 +39,20 @@ import com.raytheon.uf.edex.event.EventBus;
/**
* Subscribes to the event bus and stores them in the appropriate stats table
*
*
*
*
* <pre>
*
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Removed instance variable of event bus.
* Nov 07, 2012 1317 mpduff Updated config files.
*
*
* </pre>
*
*
* @author jsanchez
*
*
*/
public class StatsHandler {
private static final IUFStatusHandler statusHandler = UFStatus
@ -63,6 +63,21 @@ public class StatsHandler {
private static Set<String> validEventTypes = new HashSet<String>();
/**
* Set the valid event types.
*
* @param configurations
* List of StatisticsConfig objects
*/
public static void setValidEventTypes(List<StatisticsConfig> configurations) {
validEventTypes = new HashSet<String>();
for (StatisticsConfig config : configurations) {
for (StatisticsEvent event : config.getEvents()) {
validEventTypes.add(event.getType());
}
}
}
/**
* Registers StatsHandler with the event bus
*/
@ -84,24 +99,10 @@ public class StatsHandler {
record.setEventType(clazz);
record.setEvent(bytes);
dao.persist(record);
} catch (SerializationException e) {
statusHandler.error("Error transforming to Thrift.", e);
}
}
}
/**
* Set the valid event types.
*
* @param configurations
* List of StatisticsConfig objects
*/
public static void setValidEventTypes(List<StatisticsConfig> configurations) {
validEventTypes = new HashSet<String>();
for (StatisticsConfig config : configurations) {
for (StatisticsEvent event : config.getEvents()) {
validEventTypes.add(event.getType());
}
}
}
}

View file

@ -19,15 +19,18 @@
**/
package com.raytheon.uf.edex.stats.util;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
@ -38,11 +41,8 @@ import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.stats.AggregateRecord;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.TimeRange;
import com.raytheon.uf.edex.stats.xml.Aggregate;
import com.raytheon.uf.edex.stats.xml.GroupBy;
import com.raytheon.uf.edex.stats.xml.Item;
import com.raytheon.uf.edex.stats.xml.Statistics;
/**
* Archives the data in the aggregate_bucket table to an xml file.
@ -53,6 +53,7 @@ import com.raytheon.uf.edex.stats.xml.Statistics;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Initial creation.
* Nov 09, 2012 dhladky Changed to CSV output
*
* </pre>
*
@ -61,9 +62,6 @@ import com.raytheon.uf.edex.stats.xml.Statistics;
*/
public class Archiver {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(Archiver.class);
private class StatisticsKey {
public String eventType;
@ -76,10 +74,10 @@ public class Archiver {
if (o != null && o instanceof StatisticsKey) {
StatisticsKey other = (StatisticsKey) o;
return (this.eventType.equals(other.eventType)
&& this.timeRange.getStart().equals(
other.timeRange.getStart()) && this.timeRange
.getEnd().equals(other.timeRange.getEnd()));
return eventType.equals(other.eventType)
&& timeRange.getStart().equals(
other.timeRange.getStart())
&& timeRange.getEnd().equals(other.timeRange.getEnd());
}
return false;
@ -91,39 +89,98 @@ public class Archiver {
}
}
/** Marshaller object */
private Marshaller marshaller;
private static final String COMMA = ",";
/** JAXB context */
private JAXBContext jax;
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(Archiver.class);
private IPathManager pm = PathManagerFactory.getPathManager();
private final IPathManager pm = PathManagerFactory.getPathManager();
private LocalizationContext context = pm.getContext(
private final LocalizationContext context = pm.getContext(
LocalizationType.COMMON_STATIC, LocalizationLevel.SITE);
private SimpleDateFormat dateFormatter = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private SimpleDateFormat fileDateFormatter = new SimpleDateFormat(
"yyyyMMdd_HHmm");
private static final String FILE_DATE_FORMAT = "yyyyMMdd_HHmm";
private static final Pattern PERIOD_PATTERN = Pattern.compile("\\.");
public Archiver() {
public Archiver() throws JAXBException {
jax = JAXBContext.newInstance(new Class[] { Statistics.class });
this.marshaller = jax.createMarshaller();
}
/**
* Writes the statistics xml to disk.
* Creates a filename in the format /stats/aggregates/group...
* /eventType.start-end.dat
*
* @param statistics
* @throws JAXBException
* @param items
* @return
*/
public void writeToDisk(String filename, Statistics statistics)
throws JAXBException {
LocalizationFile siteLocalization = pm.getLocalizationFile(context,
filename);
marshaller.marshal(statistics, siteLocalization.getFile());
private String createFilename(TimeRange tr, String eventType, String group) {
SimpleDateFormat fileDateFormatter = new SimpleDateFormat(
FILE_DATE_FORMAT);
StringBuilder sb = new StringBuilder("stats/aggregates");
String[] chunks = PERIOD_PATTERN.split(eventType);
sb.append("/");
sb.append(group);
sb.append("/");
sb.append(chunks[chunks.length - 1]);
sb.append(".");
sb.append(fileDateFormatter.format(tr.getStart()));
sb.append("-");
sb.append(fileDateFormatter.format(tr.getEnd()));
sb.append(".csv");
return sb.toString();
}
/**
* Used for outputting the stats as CSV
*
* @return
*/
private String getCSVOutput(AggregateRecord agrec,
SimpleDateFormat dateFormat) {
StringBuilder sb = new StringBuilder();
String eventType = agrec.getEventType();
Calendar startDate = agrec.getStartDate();
Calendar endDate = agrec.getEndDate();
String grouping = agrec.getGrouping();
String field = agrec.getField();
double max = agrec.getMax();
double min = agrec.getMin();
double sum = agrec.getSum();
double count = agrec.getCount();
if (eventType != null) {
sb.append(eventType).append(COMMA);
}
if (startDate != null) {
sb.append(dateFormat.format(startDate.getTime()))
.append(COMMA);
}
if (endDate != null) {
sb.append(dateFormat.format(endDate.getTime())).append(
COMMA);
}
if (grouping != null) {
sb.append(grouping).append(COMMA);
}
if (field != null) {
sb.append(field).append(COMMA);
}
sb.append(max).append(COMMA);
sb.append(min).append(COMMA);
sb.append(sum).append(COMMA);
sb.append(count);
return sb.toString();
}
/**
@ -153,108 +210,63 @@ public class Archiver {
}
for (StatisticsKey key : statisticsMap.keySet()) {
Statistics statistics = new Statistics();
statistics.setEventType(key.eventType);
statistics.setStart(dateFormatter.format(key.timeRange.getStart()));
statistics.setEnd(dateFormatter.format(key.timeRange.getEnd()));
statistics.setGroupBy(createGroupBy(key.grouping));
statistics.setAggregates(createAggregates(statisticsMap.get(key)));
String filename = createFilename(key.timeRange, statistics);
String eventType = key.eventType;
String grouping = key.grouping;
List<AggregateRecord> records = statisticsMap.get(key);
String filename = createFilename(key.timeRange, eventType, grouping);
try {
writeToDisk(filename, statistics);
writeToFile(filename, records);
} catch (JAXBException e) {
statusHandler.error("Unable to write statistics file "
+ filename, e);
}
}
}
/**
* Creates a filename in the format
* /stats/aggregates/groupBy{0}/groupby{1}...
* /group{n}/eventType.start-end.dat
* Writes the statistics xml to disk.
*
* @param items
* @return
* @param statistics
* @throws JAXBException
*/
private String createFilename(TimeRange tr, Statistics statistics) {
StringBuffer sb = new StringBuffer("stats/aggregates");
for (Item item : statistics.getGroupBy().getAttributes()) {
sb.append("/" + item.getResult());
}
sb.append("/" + statistics.getEventType() + "."
+ fileDateFormatter.format(tr.getStart()) + "-"
+ fileDateFormatter.format(tr.getEnd()) + ".dat");
public void writeToFile(String filename, List<AggregateRecord> records)
throws JAXBException {
return sb.toString();
}
BufferedWriter bw = null;
SimpleDateFormat dateFormatter = new SimpleDateFormat(DATE_FORMAT);
LocalizationFile siteLocalization = pm.getLocalizationFile(context,
filename);
String outputFilePath = siteLocalization.getFile().getAbsolutePath();
// pre-create directories if necessary
siteLocalization.getFile().getParentFile().mkdirs();
// Write this to output CSV
try {
bw = new BufferedWriter(new FileWriter(
outputFilePath));
if (bw != null) {
for (AggregateRecord agrec : records) {
bw.write(getCSVOutput(agrec, dateFormatter));
bw.newLine();
}
}
/**
* Transforms the grouping string from the record into a GroupBy object.
*
* @param recordGroupBy
* @return
*/
private GroupBy createGroupBy(String recordGroupBy) {
GroupBy groupBy = new GroupBy();
String[] groups = recordGroupBy.split("-");
Item[] attributes = new Item[groups.length];
} catch (IOException e) {
for (int i = 0; i < groups.length; i++) {
String[] g = groups[i].split(":");
String name = g[0];
String result = g[1];
Item item = new Item();
item.setName(name);
item.setResult(result);
attributes[i] = item;
statusHandler.handle(Priority.ERROR, "Failed to write File: "
+ outputFilePath, e);
} finally {
if (bw != null) {
try {
bw.close();
} catch (IOException e) {
statusHandler.handle(Priority.PROBLEM,
"failed to close CSV output file stream. "
+ filename, e);
}
}
}
groupBy.setAttributes(attributes);
return groupBy;
}
/**
* Transforms the records into Aggregate objects
*
* @param aggregateRecordList
* @return
*/
private Aggregate[] createAggregates(
List<AggregateRecord> aggregateRecordList) {
Aggregate[] aggregates = new Aggregate[aggregateRecordList.size()];
for (int i = 0; i < aggregates.length; i++) {
AggregateRecord record = aggregateRecordList.get(i);
Aggregate aggregate = new Aggregate();
aggregate.setField(record.getField());
Item sumItem = new Item();
sumItem.setName("sum");
sumItem.setResult(String.valueOf(record.getSum()));
Item minItem = new Item();
minItem.setName("min");
minItem.setResult(String.valueOf(record.getMin()));
Item maxItem = new Item();
sumItem.setName("max");
sumItem.setResult(String.valueOf(record.getMax()));
Item countItem = new Item();
minItem.setName("count");
minItem.setResult(String.valueOf(record.getCount()));
aggregate.setFunctions(new Item[] { sumItem, minItem, maxItem,
countItem });
aggregates[i] = aggregate;
}
return aggregates;
}
}

View file

@ -40,19 +40,19 @@ import com.raytheon.uf.edex.stats.xml.Statistics;
/**
* Loads StatisticsConfig files from localization.
*
*
* <pre>
*
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Updated error handling and validated config files.
* Nov 07, 2012 1317 mpduff Update config files.
*
*
* </pre>
*
*
* @author jsanchez
*
*
*/
public class ConfigLoader {
@ -75,6 +75,15 @@ public class ConfigLoader {
private final String STATS_DIR = "stats";
/**
* Returns a list of all StatisticsConfig files.
*
* @return
*/
public List<StatisticsConfig> getConfigurations() {
return configurations;
}
/**
* Loads the StatisticsConfig files in the STATS_DIR directory.
*/
@ -106,7 +115,7 @@ public class ConfigLoader {
/**
* Removes the aggregate if its not a numerical parameter.
*
*
* @param config
*/
private StatisticsConfig validateAggregates(StatisticsConfig config)
@ -114,7 +123,7 @@ public class ConfigLoader {
List<StatisticsAggregate> aggregates = new ArrayList<StatisticsAggregate>();
for (StatisticsEvent event : config.getEvents()) {
Class<?> clazz = Class.forName(event.getType());
aggregates = new ArrayList<StatisticsAggregate>();
for (StatisticsAggregate aggregate : event.getAggregateList()) {
String aggregateField = aggregate.getField();
try {
@ -136,13 +145,4 @@ public class ConfigLoader {
return config;
}
/**
* Returns a list of all StatisticsConfig files.
*
* @return
*/
public List<StatisticsConfig> getConfigurations() {
return configurations;
}
}

View file

@ -1,6 +1,6 @@
<statisticsConfig>
<!-- Event Type should be fully qualified name of stat event -->
<statisticsEvent type="com.raytheon.uf.common.event.ProcessEvent"
<statisticsEvent type="com.raytheon.uf.common.stats.ProcessEvent"
displayName="Processing Events" category="Data Ingest Events">
<statisticsGroup name="pluginName" displayName="Data Type" />
<statisticsAggregate field="processingTime"