Issue #1917: Update stat offline processing and add purge. Add batching of stat deleting and vacuum after every aggregation period.

Change-Id: Ia92ea7dd37202cdf3806529661588b776e98c47c

Former-commit-id: a48084f6cd99a440fcaab025c0178cf1fa17c9bb
This commit is contained in:
Richard Peter 2013-05-23 11:50:00 -05:00
parent 2a733fa5f8
commit 7349808131
32 changed files with 1295 additions and 525 deletions

View file

@ -28,7 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.raytheon.uf.common.stats.data.StatsEventData;
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
import com.raytheon.uf.common.stats.xml.StatisticsEventConfig;
import com.raytheon.uf.common.stats.xml.StatisticsGroup;
/**
@ -40,7 +40,8 @@ import com.raytheon.uf.common.stats.xml.StatisticsGroup;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 8, 2012 728 mpduff Initial creation
* Nov 8, 2012 728 mpduff Initial creation
* May 22, 2013 1917 rjpeter Renamed StatisticsEvent to StatisticsEventConfig
*
* </pre>
*
@ -83,7 +84,7 @@ public class StatsUiUtils {
*/
@VisibleForTesting
void processConfig(StatisticsConfig config) {
for (StatisticsEvent event: config.getEvents()) {
for (StatisticsEventConfig event: config.getEvents()) {
processEvent(event);
}
}
@ -94,7 +95,7 @@ public class StatsUiUtils {
* @param event event config object
*/
@VisibleForTesting
void processEvent(StatisticsEvent event) {
void processEvent(StatisticsEventConfig event) {
if (!eventMap.containsKey(event.getCategory())) {
eventMap.put(event.getCategory(), new HashMap<String, StatsEventData>());
}
@ -143,7 +144,7 @@ public class StatsUiUtils {
public Map<String, String> getEventAttributes(String category, String type) {
Map<String, String> attMap = new TreeMap<String, String>();
for (StatisticsConfig config: configList) {
for (StatisticsEvent event: config.getEvents()) {
for (StatisticsEventConfig event: config.getEvents()) {
if (event.getCategory().equals(category) && event.getDisplayName().equals(type)) {
for (StatisticsAggregate agg: event.getAggregateList()) {
attMap.put(agg.getDisplayName(), agg.getField());
@ -186,7 +187,7 @@ public class StatsUiUtils {
public StatisticsAggregate getAggregateConfig(String category,
String typeID, String attributeDisplayName) {
for (StatisticsConfig config : configList) {
for (StatisticsEvent event: config.getEvents()) {
for (StatisticsEventConfig event: config.getEvents()) {
if (event.getCategory().equals(category) && event.getType().equals(typeID)) {
for (StatisticsAggregate agg: event.getAggregateList()) {
if (agg.getDisplayName().equals(attributeDisplayName)) {

View file

@ -0,0 +1,11 @@
#!/bin/bash
# 1917 Removes old aggregate format/layout
echo "Removing old stat aggregates"
rm -rf /awips2/edex/data/utility/common_static/site/*/stats/aggregates
# run full vacuum on stats table, code keeps table more stable
PSQL="/awips2/psql/bin/psql"
echo "Running full vacuum on stats"
${PSQL} -U awips -d metadata -c "VACUUM FULL ANALYZE events.stats;"

View file

@ -1,7 +1,8 @@
<statisticsConfig>
<!-- Event Type should be fully qualified name of stat event -->
<statisticsEvent type="com.raytheon.uf.common.stats.LoadEvent"
displayName="Load Time" category="FFMP Load Times">
displayName="Load Time" category="FFMP Load Times"
rawOfflineRetentionDays="90" aggregateOfflineRetentionDays="90">
<statisticsGroup name="type" displayName="Type" />
<!-- Processing time available display units:
ms, Seconds, Minutes, Hours -->

View file

@ -49,7 +49,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* ------------ ---------- ----------- --------------------------
* Sep 24, 2008 chammack Initial creation
* Nov 13, 2008 njensen Added thrift methods
*
* May 22, 2013 1917 rjpeter Added non-pretty print option to jaxb serialize methods.
* </pre>
*
* @author chammack
@ -81,7 +81,7 @@ public class JAXBManager {
private static class MaintainEventsValidationHandler implements
ValidationEventHandler {
private ArrayList<ValidationEvent> events = new ArrayList<ValidationEvent>(
private final ArrayList<ValidationEvent> events = new ArrayList<ValidationEvent>(
0);
@Override
@ -105,9 +105,9 @@ public class JAXBManager {
private final JAXBContext jaxbContext;
private Queue<Unmarshaller> unmarshallers = new ConcurrentLinkedQueue<Unmarshaller>();
private final Queue<Unmarshaller> unmarshallers = new ConcurrentLinkedQueue<Unmarshaller>();
private Queue<Marshaller> marshallers = new ConcurrentLinkedQueue<Marshaller>();
private final Queue<Marshaller> marshallers = new ConcurrentLinkedQueue<Marshaller>();
public JAXBManager(Class<?>... clazz) throws JAXBException {
jaxbContext = JAXBContext.newInstance(clazz);
@ -164,7 +164,7 @@ public class JAXBManager {
return obj;
} finally {
handleEvents(msh, null);
if (msh != null && unmarshallers.size() < QUEUE_SIZE) {
if ((msh != null) && (unmarshallers.size() < QUEUE_SIZE)) {
unmarshallers.add(msh);
}
}
@ -221,8 +221,8 @@ public class JAXBManager {
}
/**
* Convert an instance of a class to an XML representation in a string. Uses
* JAXB.
* Convert an instance of a class to an XML pretty print representation in a
* string. Uses JAXB.
*
* @param obj
* Object being marshalled
@ -230,19 +230,51 @@ public class JAXBManager {
* @throws JAXBException
*/
public String marshalToXml(Object obj) throws JAXBException {
return marshalToXml(obj, true);
}
/**
* Convert an instance of a class to an XML representation in a string. Uses
* JAXB.
*
* @param obj
* Object being marshalled
* @param formattedOutput
* True if the output should be xml pretty print.
* @return XML string representation of the object
* @throws JAXBException
*/
public String marshalToXml(Object obj, boolean formatedOutput)
throws JAXBException {
Marshaller msh = getMarshaller();
try {
StringWriter writer = new StringWriter();
msh.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, new Boolean(true));
msh.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, new Boolean(
formatedOutput));
msh.marshal(obj, writer);
return writer.toString();
} finally {
if (msh != null && marshallers.size() < QUEUE_SIZE) {
if ((msh != null) && (marshallers.size() < QUEUE_SIZE)) {
marshallers.add(msh);
}
}
}
/**
* Convert an instance of a class to an XML representation and writes pretty
* print formatted XML to file. Uses JAXB.
*
* @param obj
* Object to be marshaled
* @param filePath
* Path to the output file
* @throws SerializationException
*/
public void jaxbMarshalToXmlFile(Object obj, String filePath)
throws SerializationException {
jaxbMarshalToXmlFile(obj, filePath, true);
}
/**
* Convert an instance of a class to an XML representation and write XML to
* file. Uses JAXB.
@ -251,21 +283,24 @@ public class JAXBManager {
* Object to be marshaled
* @param filePath
* Path to the output file
* @param formattedOutput
* True if the output should be xml pretty print.
* @throws SerializationException
*/
public void jaxbMarshalToXmlFile(Object obj, String filePath)
throws SerializationException {
public void jaxbMarshalToXmlFile(Object obj, String filePath,
boolean formattedOutput) throws SerializationException {
FileWriter writer = null;
Marshaller msh = null;
try {
msh = getMarshaller();
msh.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, new Boolean(true));
msh.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, new Boolean(
formattedOutput));
writer = new FileWriter(new File(filePath));
msh.marshal(obj, writer);
} catch (Exception e) {
throw new SerializationException(e);
} finally {
if (msh != null && marshallers.size() < QUEUE_SIZE) {
if ((msh != null) && (marshallers.size() < QUEUE_SIZE)) {
marshallers.add(msh);
}
if (writer != null) {
@ -315,7 +350,7 @@ public class JAXBManager {
if (msh != null) {
handleEvents(msh, file.getName());
}
if (msh != null && unmarshallers.size() < QUEUE_SIZE) {
if ((msh != null) && (unmarshallers.size() < QUEUE_SIZE)) {
unmarshallers.add(msh);
}
if (reader != null) {
@ -350,7 +385,7 @@ public class JAXBManager {
if (msh != null) {
handleEvents(msh, null);
}
if (msh != null && unmarshallers.size() < QUEUE_SIZE) {
if ((msh != null) && (unmarshallers.size() < QUEUE_SIZE)) {
unmarshallers.add(msh);
}
if (is != null) {

View file

@ -20,4 +20,5 @@ Require-Bundle: com.raytheon.uf.common.time;bundle-version="1.12.1174",
com.raytheon.uf.common.status;bundle-version="1.12.1174",
javax.measure;bundle-version="1.0.0",
com.raytheon.uf.common.units;bundle-version="1.0.0",
org.apache.commons.lang;bundle-version="2.3.0"
org.apache.commons.lang;bundle-version="2.3.0",
org.hibernate

View file

@ -33,8 +33,8 @@ import javax.xml.bind.annotation.XmlRootElement;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 15, 2013 1487 djohnson Initial creation
*
* Jan 15, 2013 1487 djohnson Initial creation
* May 22, 2013 1917 rjpeter Added hashCode and equals.
* </pre>
*
* @author djohnson
@ -98,4 +98,41 @@ public class StatsGrouping {
this.value = value;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StatsGrouping other = (StatsGrouping) obj;
if (name == null) {
if (other.name != null) {
return false;
}
} else if (!name.equals(other.name)) {
return false;
}
if (value == null) {
if (other.value != null) {
return false;
}
} else if (!value.equals(other.value)) {
return false;
}
return true;
}
}

View file

@ -37,8 +37,8 @@ import com.google.common.collect.Lists;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 15, 2013 1487 djohnson Initial creation
*
* Jan 15, 2013 1487 djohnson Initial creation
* May 22, 2013 1917 rjpeter Added hashCode and equals.
* </pre>
*
* @author djohnson
@ -84,4 +84,34 @@ public class StatsGroupingColumn {
return column;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((group == null) ? 0 : group.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StatsGroupingColumn other = (StatsGroupingColumn) obj;
if (group == null) {
if (other.group != null) {
return false;
}
} else if (!group.equals(other.group)) {
return false;
}
return true;
}
}

View file

@ -31,6 +31,8 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.hibernate.annotations.BatchSize;
import com.raytheon.uf.common.dataplugin.persist.PersistableDataObject;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@ -43,14 +45,14 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Initial creation
*
* Aug 21, 2012 jsanchez Initial creation
* May 22, 2013 1917 rjpeter Added BatchSize annotation.
* </pre>
*
* @author jsanchez
*
*/
@Entity
@BatchSize(size = 500)
@Table(name = "stats", schema = "events")
@XmlRootElement
@XmlAccessorType(XmlAccessType.NONE)

View file

@ -43,8 +43,8 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 6, 2012 728 mpduff Initial creation.
*
* Nov 6, 2012 728 mpduff Initial creation.
* May 22, 2013 1917 rjpeter Renamed StatisticsEvent to StatisticsEventConfig.
* </pre>
*
* @author mpduff
@ -54,14 +54,14 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@XmlRootElement(name = "statisticsConfig")
@XmlAccessorType(XmlAccessType.NONE)
public class StatisticsConfig implements ISerializableObject {
@XmlElements({ @XmlElement(name = "statisticsEvent", type = StatisticsEvent.class) })
@XmlElements({ @XmlElement(name = "statisticsEvent", type = StatisticsEventConfig.class) })
@DynamicSerializeElement
private List<StatisticsEvent> events;
private List<StatisticsEventConfig> events;
/**
* @return the events
*/
public List<StatisticsEvent> getEvents() {
public List<StatisticsEventConfig> getEvents() {
return events;
}
@ -69,7 +69,7 @@ public class StatisticsConfig implements ISerializableObject {
* @param events
* the events to set
*/
public void setEvents(List<StatisticsEvent> events) {
public void setEvents(List<StatisticsEventConfig> events) {
this.events = events;
}
@ -81,7 +81,7 @@ public class StatisticsConfig implements ISerializableObject {
public List<String> getCategories() {
Set<String> categories = new HashSet<String>();
if (events != null && events.size() > 0) {
for (StatisticsEvent event : events) {
for (StatisticsEventConfig event : events) {
categories.add(event.getCategory());
}
}

View file

@ -42,8 +42,9 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 6, 2012 728 mpduff Initial creation.
*
* Nov 6, 2012 728 mpduff Initial creation.
* May 22, 2013 1917 rjpeter Renamed to StatisticsEventConfig and
* added offline retention settings.
* </pre>
*
* @author mpduff
@ -52,7 +53,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@DynamicSerialize
@XmlRootElement(name = "event")
@XmlAccessorType(XmlAccessType.NONE)
public class StatisticsEvent {
public class StatisticsEventConfig {
@XmlAttribute
@DynamicSerializeElement
@ -66,6 +67,22 @@ public class StatisticsEvent {
@DynamicSerializeElement
private String category;
/**
* Retention period for the raw offline statistic to be saved. Value < 0 do
* not retain, value = 0 retain all, value > 0 retain for value days.
*/
@XmlAttribute
@DynamicSerializeElement
private int rawOfflineRetentionDays = -1;
/**
* Retention period for the aggregate offline statistic to be saved. Value <
* 0 do not retain, value = 0 retain all, value > 0 retain for value days.
*/
@XmlAttribute
@DynamicSerializeElement
private int aggregateOfflineRetentionDays;
@XmlElements({ @XmlElement(name = "statisticsGroup", type = StatisticsGroup.class) })
@DynamicSerializeElement
private List<StatisticsGroup> groupList;
@ -179,4 +196,20 @@ public class StatisticsEvent {
this.aggregateMethods = aggregateMethods;
}
public int getRawOfflineRetentionDays() {
return rawOfflineRetentionDays;
}
public void setRawOfflineRetentionDays(int rawOfflineRetentionDays) {
this.rawOfflineRetentionDays = rawOfflineRetentionDays;
}
public int getAggregateOfflineRetentionDays() {
return aggregateOfflineRetentionDays;
}
public void setAggregateOfflineRetentionDays(
int aggregateOfflineRetentionDays) {
this.aggregateOfflineRetentionDays = aggregateOfflineRetentionDays;
}
}

View file

@ -1,4 +0,0 @@
# scan interval of stats table in minutes
stats.scanInterval=15
# bucket interval or period of when to aggregate in minutes
stats.period=5

View file

@ -1,7 +1,8 @@
<statisticsConfig>
<!-- Event Type should be fully qualified name of stat event -->
<statisticsEvent type="com.raytheon.uf.common.datadelivery.event.retrieval.SubscriptionRetrievalEvent"
displayName="Subscription Retrieval" category="Data Delivery">
displayName="Subscription Retrieval" category="Data Delivery"
rawOfflineRetentionDays="-1" aggregateOfflineRetentionDays="90">
<statisticsGroup name="plugin" displayName="Data Type" />
<statisticsGroup name="provider" displayName="Data Provider" />
<statisticsGroup name="owner" displayName="Owner" />

View file

@ -1,7 +1,8 @@
<statisticsConfig>
<!-- Event Type should be fully qualified name of stat event -->
<statisticsEvent type="com.raytheon.uf.common.registry.event.RegistryStatisticsEvent"
displayName="Registry Statistics" category="Registry">
displayName="Registry Statistics" category="Registry"
rawOfflineRetentionDays="-1" aggregateOfflineRetentionDays="90">
<statisticsGroup name="owner" displayName="Transaction Owner" />
<statisticsGroup name="status" displayName="Transaction Status" />
<statisticsGroup name="type" displayName="Transaction Type" />

View file

@ -10,6 +10,7 @@ Require-Bundle: com.raytheon.uf.common.serialization;bundle-version="1.12.1174",
com.raytheon.uf.common.event;bundle-version="1.0.0",
com.google.guava;bundle-version="1.0.0",
com.raytheon.uf.edex.database;bundle-version="1.0.0",
com.raytheon.edex.common,
com.raytheon.uf.common.localization;bundle-version="1.12.1174",
com.raytheon.uf.common.dataquery;bundle-version="1.0.0",
com.raytheon.uf.common.time;bundle-version="1.12.1174",

View file

@ -1,41 +0,0 @@
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="statsPurge"
class="com.raytheon.uf.edex.stats.StatsPurge"
depends-on="statsRegister"/>
<bean id="aggregateManager" class="com.raytheon.uf.edex.stats.AggregateManager">
<constructor-arg value="${stats.period}"/>
</bean>
<bean id="edexStatsRegistered" factory-bean="clusteredCamelContextMgr"
factory-method="register" depends-on="persistCamelRegistered">
<constructor-arg ref="edexStats-camel"/>
</bean>
<camelContext id="edexStats-camel"
xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler"
autoStartup="false">
<endpoint id="statsScanTimer" uri="timer://scanStats?period=${stats.scanInterval}m"/>
<route id="statsTableScan">
<from ref="statsScanTimer" />
<doTry>
<bean ref="statsPurge" method="purgeAggregates"/>
<bean ref="aggregateManager" method="scan"/>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:stats?level=ERROR&amp;showBody=false&amp;showCaughtException=true&amp;showStackTrace=true"/>
</doCatch>
</doTry>
</route>
</camelContext>
</beans>

View file

@ -1,13 +0,0 @@
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="statsGraphDataHandler" class="com.raytheon.uf.edex.stats.handler.GraphDataHandler"/>
<bean factory-bean="handlerRegistry" factory-method="register">
<constructor-arg value="com.raytheon.uf.common.stats.GraphDataRequest"/>
<constructor-arg ref="statsGraphDataHandler"/>
</bean>
</beans>

View file

@ -0,0 +1,66 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="statsPurge" class="com.raytheon.uf.edex.stats.StatsPurge"
depends-on="statsRegister"/>
<bean id="aggregateManager" class="com.raytheon.uf.edex.stats.AggregateManager">
<!-- Not directly exposing at this time, due to performance concerns from
improper values -->
<!-- Bucket interval in minutes for aggregation -->
<constructor-arg value="5"/>
</bean>
<bean id="edexStatsRegistered" factory-bean="clusteredCamelContextMgr"
factory-method="register" depends-on="persistCamelRegistered">
<constructor-arg ref="edexStats-camel"/>
</bean>
<camelContext id="edexStats-camel" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler" autoStartup="false">
<endpoint id="statsScanTimer" uri="timer://scanStats?period=${stats.scanInterval}m"/>
<endpoint id="aggrToCsvTimer"
uri="quartz://stats/aggrToCsv/?cron=${stats.aggregateToCsv.cron}"/>
<endpoint id="statsPurgeTimer" uri="quartz://stats/purge/?cron=${stats.purge.cron}"/>
<route id="statsTableScan">
<from ref="statsScanTimer"/>
<doTry>
<bean ref="aggregateManager" method="scan"/>
<doCatch>
<exception>java.lang.Throwable</exception>
<to
uri="log:stats?level=ERROR&amp;showBody=false&amp;showCaughtException=true&amp;showStackTrace=true"/>
</doCatch>
</doTry>
</route>
<route id="statsAggrToCsv">
<from ref="aggrToCsvTimer"/>
<doTry>
<bean ref="aggregateManager" method="offlineAggregates"/>
<doCatch>
<exception>java.lang.Throwable</exception>
<to
uri="log:stats?level=ERROR&amp;showBody=false&amp;showCaughtException=true&amp;showStackTrace=true"/>
</doCatch>
</doTry>
</route>
<route id="statsPurgeRoute">
<from ref="statsPurgeTimer"/>
<doTry>
<bean ref="statsPurge" method="purge"/>
<doCatch>
<exception>java.lang.Throwable</exception>
<to
uri="log:stats?level=ERROR&amp;showBody=false&amp;showCaughtException=true&amp;showStackTrace=true"/>
</doCatch>
</doTry>
</route>
</camelContext>
</beans>

View file

@ -1,19 +1,17 @@
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<!-- Need to set up connect between cave and edex
1) The possible combinations to populate drop downs etc
2) Bucketizing so that Cave requests data in 15 minute buckets,
you would need to do the aggregation (still undecided on if this is a cave or edex feature).
-->
<bean id="aggregatedStatsHandler" class="com.raytheon.uf.edex.stats.handler.AggregatedStatsHandler"/>
<bean id="aggregatedStatsHandler" class="com.raytheon.uf.edex.stats.handler.AggregatedStatsHandler"/>
<bean factory-bean="handlerRegistry" factory-method="register">
<constructor-arg value="com.raytheon.uf.common.stats.AggregatedStatsRequest"/>
<constructor-arg ref="aggregatedStatsHandler"/>
<constructor-arg value="com.raytheon.uf.common.stats.AggregatedStatsRequest"/>
<constructor-arg ref="aggregatedStatsHandler"/>
</bean>
<bean id="statsGraphDataHandler" class="com.raytheon.uf.edex.stats.handler.GraphDataHandler"/>
<bean factory-bean="handlerRegistry" factory-method="register">
<constructor-arg value="com.raytheon.uf.common.stats.GraphDataRequest"/>
<constructor-arg ref="statsGraphDataHandler"/>
</bean>
</beans>

View file

@ -1,4 +0,0 @@
# scan interval of stats table in minutes
stats.scanInterval=2
# bucket interval or period of when to aggregate in minutes
stats.period=5

View file

@ -0,0 +1,8 @@
# scan interval of stats table in minutes
stats.scanInterval=2
# When to save off aggregate data to csv format
stats.aggregateToCsv.cron=0+10+*+*+*+?
# When to run purge of aggregate tables and csv files
stats.purge.cron=0+15+*+*+*+?

View file

@ -24,6 +24,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -35,15 +36,15 @@ import javax.xml.bind.JAXBException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.raytheon.uf.common.event.Event;
import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.stats.AggregateRecord;
import com.raytheon.uf.common.stats.StatisticsEvent;
import com.raytheon.uf.common.stats.StatsGrouping;
import com.raytheon.uf.common.stats.StatsGroupingColumn;
import com.raytheon.uf.common.stats.StatsRecord;
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
import com.raytheon.uf.common.stats.xml.StatisticsEventConfig;
import com.raytheon.uf.common.stats.xml.StatisticsGroup;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
@ -66,10 +67,12 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Stored the aggregate buckets in the db.
* Nov 07, 2012 1317 mpduff Updated Configuration Files.
* Nov 28, 2012 1350 rjpeter Simplied aggregation and added aggregation with current db aggregate records.
* Nov 07, 2012 1317 mpduff Updated Configuration Files.
* Nov 28, 2012 1350 rjpeter Simplied aggregation and added aggregation with current db aggregate records.
* Jan 07, 2013 1451 djohnson Use newGmtCalendar().
* Jan 15, 2013 1487 djohnson Use xml for the grouping information on an {@link AggregateRecord}.
* May 22, 2013 1917 rjpeter Added ability to save raw and aggregate stats, to reclaimSpace every scan call,
* and to not pretty print xml grouping information.
* </pre>
*
* @author jsanchez
@ -96,9 +99,6 @@ public class AggregateManager {
/** default value */
private static final int defaultBucketInterval = 5;
/** default value */
private static final int defaultScanInterval = 15;
public AggregateManager(String bucketInterval) {
validateIntervals(bucketInterval);
}
@ -112,8 +112,10 @@ public class AggregateManager {
* @param timeRange
* @param groupedEvents
*/
private void aggregate(AggregateRecordDao dao, StatisticsEvent statsEvent,
TimeRange timeRange, Multimap<String, Event> groupedEvents) {
private void aggregate(AggregateRecordDao dao,
StatisticsEventConfig statsEvent, TimeRange timeRange,
Multimap<StatsGroupingColumn, StatisticsEvent> groupedEvents)
throws JAXBException {
Calendar start = TimeUtil.newGmtCalendar();
start.setTime(timeRange.getStart());
@ -121,8 +123,10 @@ public class AggregateManager {
end.setTime(timeRange.getEnd());
// perform aggregate functions on the grouped data
for (String groupKey : groupedEvents.keySet()) {
Collection<Event> groupData = groupedEvents.get(groupKey);
for (StatsGroupingColumn group : groupedEvents.keySet()) {
Collection<StatisticsEvent> groupData = groupedEvents.get(group);
String groupKey = JAXB_MANAGER.marshalToXml(group, false);
Iterator<Method> aggrMethodIter = statsEvent.getAggregateMethods()
.iterator();
Iterator<StatisticsAggregate> statAggrIter = statsEvent
@ -138,7 +142,7 @@ public class AggregateManager {
double min = Double.MAX_VALUE;
double sum = 0;
for (Event event : groupData) {
for (StatisticsEvent event : groupData) {
Number number = (Number) m.invoke(event, new Object[0]);
double value = number.doubleValue();
sum += value;
@ -217,9 +221,10 @@ public class AggregateManager {
long t0 = System.currentTimeMillis();
ConfigLoader configLoader = ConfigLoader.getInstance();
StatsDao statsRecordDao = new StatsDao();
OfflineStatsManager offline = new OfflineStatsManager();
AggregateRecordDao aggrRecordDao = new AggregateRecordDao();
Map<String, StatisticsEvent> statsMap = configLoader.getTypeView();
Map<String, StatisticsEventConfig> statsMap = configLoader
.getTypeView();
// latest time to pull
Calendar timeToProcess = Calendar.getInstance(TimeZone
@ -227,9 +232,10 @@ public class AggregateManager {
int count = 0;
// process the events by type
for (Map.Entry<String, StatisticsEvent> entry : statsMap.entrySet()) {
for (Map.Entry<String, StatisticsEventConfig> entry : statsMap
.entrySet()) {
String type = entry.getKey();
StatisticsEvent event = entry.getValue();
StatisticsEventConfig event = entry.getValue();
List<StatsRecord> records = null;
do {
@ -239,10 +245,10 @@ public class AggregateManager {
if (!CollectionUtil.isNullOrEmpty(records)) {
// sort events into time buckets
Map<TimeRange, Multimap<String, Event>> timeMap = sort(
Map<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMap = sort(
event, records);
for (Map.Entry<TimeRange, Multimap<String, Event>> timeMapEntry : timeMap
for (Map.Entry<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMapEntry : timeMap
.entrySet()) {
aggregate(aggrRecordDao, event, timeMapEntry.getKey(),
timeMapEntry.getValue());
@ -255,10 +261,14 @@ public class AggregateManager {
}
count += records.size();
if (event.getRawOfflineRetentionDays() >= 0) {
offline.writeStatsToDisk(event, timeMap);
}
}
} while (!CollectionUtil.isNullOrEmpty(records));
}
statsRecordDao.reclaimSpace();
long t1 = System.currentTimeMillis();
statusHandler.info("Aggregated " + count + " stat events in "
+ (t1 - t0) + " ms");
@ -270,11 +280,11 @@ public class AggregateManager {
* @param records
* @return
*/
private Map<TimeRange, Multimap<String, Event>> sort(
StatisticsEvent statEvent, List<StatsRecord> records) {
Map<TimeRange, Multimap<String, Event>> rval = new HashMap<TimeRange, Multimap<String, Event>>();
private Map<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> sort(
StatisticsEventConfig statEvent, List<StatsRecord> records) {
Map<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> rval = new HashMap<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>>();
TimeRange timeRange = null;
Multimap<String, Event> eventsByGroup = null;
Multimap<StatsGroupingColumn, StatisticsEvent> eventsByGroup = null;
for (StatsRecord record : records) {
if ((timeRange == null)
@ -290,13 +300,13 @@ public class AggregateManager {
try {
// get underlying event
Event event = SerializationUtil.transformFromThrift(
Event.class, record.getEvent());
StatisticsEvent event = SerializationUtil.transformFromThrift(
StatisticsEvent.class, record.getEvent());
String groupAsString = determineGroupRepresentationForEvent(
StatsGroupingColumn group = determineGroupRepresentationForEvent(
statEvent, event);
if (groupAsString != null) {
eventsByGroup.put(groupAsString, event);
if (group != null) {
eventsByGroup.put(group, event);
}
} catch (Exception e) {
statusHandler
@ -309,10 +319,9 @@ public class AggregateManager {
}
@VisibleForTesting
static String determineGroupRepresentationForEvent(
StatisticsEvent statEvent, Event event)
throws IllegalAccessException, InvocationTargetException,
JAXBException {
static StatsGroupingColumn determineGroupRepresentationForEvent(
StatisticsEventConfig statEvent, StatisticsEvent event)
throws IllegalAccessException, InvocationTargetException {
Iterator<Method> gMethodIter = statEvent.getGroupByMethods().iterator();
Iterator<StatisticsGroup> gFieldNameIter = statEvent.getGroupList()
.iterator();
@ -322,14 +331,13 @@ public class AggregateManager {
Method m = gMethodIter.next();
String field = gFieldNameIter.next().getName();
String gVal = String.valueOf(m.invoke(event, EMPTY_OBJ_ARR));
groupings.add(new StatsGrouping(field, gVal));
}
StatsGroupingColumn column = new StatsGroupingColumn();
column.setGroup(groupings);
return JAXB_MANAGER.marshalToXml(column);
return column;
}
/**
@ -361,4 +369,68 @@ public class AggregateManager {
+ bucketInterval + "'");
}
}
/**
* Scans the aggregate table for aggregate statistics to offline. It doesn't
* process any aggregate from within the 12 hours.
*/
public void offlineAggregates() {
ConfigLoader configLoader = ConfigLoader.getInstance();
OfflineStatsManager offline = new OfflineStatsManager();
AggregateRecordDao aggrRecordDao = new AggregateRecordDao();
Map<String, StatisticsEventConfig> statsMap = configLoader
.getTypeView();
// offline aggregate data older than 6 hours
long maxTime = (System.currentTimeMillis() / TimeUtil.MILLIS_PER_HOUR - 6)
* TimeUtil.MILLIS_PER_HOUR;
for (StatisticsEventConfig conf : statsMap.values()) {
if (conf.getAggregateOfflineRetentionDays() >= 0) {
String eventType = conf.getType();
try {
Date oldestAggregateDate = aggrRecordDao
.getOldestAggregateDate(eventType);
if (oldestAggregateDate != null) {
Date mostRecentOfflineDate = offline
.getMostRecentOfflinedAggregate(conf);
long startHour = oldestAggregateDate.getTime()
/ TimeUtil.MILLIS_PER_HOUR;
if (mostRecentOfflineDate != null) {
// move ahead one hour from most recent time on disk
long offlineHour = mostRecentOfflineDate.getTime()
/ TimeUtil.MILLIS_PER_HOUR + 1;
if (offlineHour > startHour) {
startHour = offlineHour;
}
}
Date startDate = new Date(startHour
* TimeUtil.MILLIS_PER_HOUR);
// process an hour at a time
Date endDate = new Date(startDate.getTime()
+ TimeUtil.MILLIS_PER_HOUR);
while (endDate.getTime() <= maxTime) {
List<AggregateRecord> records = aggrRecordDao
.getAggregates(eventType, startDate,
endDate);
offline.writeAggregatesToDisk(conf, records);
startDate = endDate;
endDate = new Date(startDate.getTime()
+ TimeUtil.MILLIS_PER_HOUR);
}
}
} catch (Exception e) {
statusHandler.error(
"Error occured generating offline aggregates for event "
+ conf.getType(), e);
}
}
}
// zip up old data?
}
}

View file

@ -0,0 +1,599 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.stats;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Method;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.TimeZone;
import javax.xml.bind.JAXBException;
import com.google.common.collect.Multimap;
import com.raytheon.edex.util.Util;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.localization.exception.LocalizationException;
import com.raytheon.uf.common.stats.AggregateRecord;
import com.raytheon.uf.common.stats.StatisticsEvent;
import com.raytheon.uf.common.stats.StatsGrouping;
import com.raytheon.uf.common.stats.StatsGroupingColumn;
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
import com.raytheon.uf.common.stats.xml.StatisticsEventConfig;
import com.raytheon.uf.common.stats.xml.StatisticsGroup;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.TimeRange;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.stats.data.StatsDataAccumulator;
/**
* Offlines data to csv format for long term comparison.
*
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Initial creation.
* Nov 09, 2012 dhladky Changed to CSV output
* Jan 24, 2013 1357 mpduff Fix comma output and paths.
* May 22, 2013 1917 rjpeter Renamed from Archiver, added generation of raw statistics,
* added method to purge statistics, moved saving of statistics
* to configured instead of site level.
* </pre>
*
* @author jsanchez
*
*/
public class OfflineStatsManager {
private class StatisticsKey {
private final long epochHours;
public StatisticsKey(Date time) {
this.epochHours = time.getTime() / TimeUtil.MILLIS_PER_HOUR;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (epochHours ^ (epochHours >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StatisticsKey other = (StatisticsKey) obj;
if (!getOuterType().equals(other.getOuterType())) {
return false;
}
if (epochHours != other.epochHours) {
return false;
}
return true;
}
private OfflineStatsManager getOuterType() {
return OfflineStatsManager.this;
}
}
private static final String COMMA = ",";
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(OfflineStatsManager.class);
private final IPathManager pm = PathManagerFactory.getPathManager();
private final LocalizationContext configuredContext = pm.getContext(
LocalizationType.COMMON_STATIC, LocalizationLevel.CONFIGURED);
private final SimpleDateFormat fieldSdf;
private final SimpleDateFormat directorySdf;
private final SimpleDateFormat fileSdf;
private final DecimalFormat avgFormatter = new DecimalFormat("0.######");
public OfflineStatsManager() {
TimeZone gmt = TimeZone.getTimeZone("GMT");
fieldSdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
fieldSdf.setTimeZone(gmt);
directorySdf = new SimpleDateFormat("yyyyMMdd");
directorySdf.setTimeZone(gmt);
fileSdf = new SimpleDateFormat("yyyyMMddHH");
fileSdf.setTimeZone(gmt);
}
/**
* Gets a directory name in the format stats/[rawStats|aggregates]/StatType
*
* @param conf
* @param isAggregate
* @return
*/
private String getBaseDirectory(StatisticsEventConfig conf,
boolean isAggregate) {
StringBuffer sb = new StringBuffer(40);
sb.append("stats").append(File.separatorChar);
if (isAggregate) {
sb.append("aggregates");
} else {
sb.append("rawStats");
}
sb.append(File.separatorChar).append(conf.getTypeClass().getName());
return sb.toString();
}
/**
* Creates a filename in the format
* stats/[rawStats|aggregates]/StatType/yyyyMMdd/StatType_yyyyMMddHH.csv
*
* @param conf
* @param isAggregate
* @param epochHours
* @return
*/
private String getStatFilename(StatisticsEventConfig conf,
boolean isAggregate, long epochHours) {
String baseName = getBaseDirectory(conf, isAggregate);
StringBuilder sb = new StringBuilder(baseName.length() + 40);
Date time = new Date(epochHours * TimeUtil.MILLIS_PER_HOUR);
sb.append(baseName).append(File.separatorChar)
.append(directorySdf.format(time)).append(File.separatorChar)
.append(conf.getTypeClass().getSimpleName()).append("_")
.append(fileSdf.format(time)).append(".csv");
return sb.toString();
}
/**
* Writes a raw statistic in CSV format to the passed BufferedWriter.
*
* @param bw
* @param conf
* @param grouping
* @param event
* @throws IOException
*/
private void writeCSVOutput(BufferedWriter bw, StatisticsEventConfig conf,
StatsGroupingColumn grouping, StatisticsEvent event)
throws IOException {
Calendar time = event.getDate();
if (time != null) {
bw.write(fieldSdf.format(time.getTime()));
}
for (StatsGrouping group : grouping.getGroup()) {
bw.write(COMMA);
bw.write(group.getValue());
}
for (Method m : conf.getAggregateMethods()) {
try {
bw.write(COMMA);
Number number = (Number) m.invoke(event, new Object[0]);
bw.write(number.toString());
} catch (Exception e) {
statusHandler.error(
"Unable to aggregate '" + m.getName() + "'", e);
}
}
bw.newLine();
}
/**
* Writes the aggregate statistic to the passed BufferedWriter.
*
* @param bw
* @param conf
* @param agg
* @throws IOException
*/
private void writeCSVOutput(BufferedWriter bw, StatisticsEventConfig conf,
AggregateRecord agg) throws IOException {
Calendar startDate = agg.getStartDate();
Calendar endDate = agg.getEndDate();
double sum = agg.getSum();
double count = agg.getCount();
if (startDate != null) {
bw.write(fieldSdf.format(startDate.getTime()));
}
bw.write(COMMA);
if (endDate != null) {
bw.write(fieldSdf.format(endDate.getTime()));
}
StatsGroupingColumn grouping = StatsDataAccumulator
.unmarshalGroupingColumnFromRecord(agg);
for (StatsGrouping group : grouping.getGroup()) {
bw.write(COMMA);
bw.write(group.getValue());
}
bw.write(COMMA);
bw.write(agg.getField());
bw.write(COMMA);
if (count > 0) {
bw.write(avgFormatter.format(sum / count));
} else {
bw.write("0");
}
bw.write(COMMA);
bw.write(String.valueOf(agg.getMin()));
bw.write(COMMA);
bw.write(String.valueOf(agg.getMax()));
bw.write(COMMA);
bw.write(String.valueOf(sum));
bw.write(COMMA);
bw.write(String.valueOf(count));
bw.newLine();
}
/**
* Opens a buffered writer for the given StatisticsKey and
* StatisticsEventConfig. If its a new CSV file a header is also added to
* the file.
*
* @param key
* @param conf
* @return
* @throws IOException
*/
private BufferedWriter getStatEventBufferedWriter(StatisticsKey key,
StatisticsEventConfig conf) throws IOException {
BufferedWriter bw = null;
LocalizationFile siteLocalization = pm
.getLocalizationFile(configuredContext,
getStatFilename(conf, false, key.epochHours));
File outFile = siteLocalization.getFile();
boolean addHeader = outFile.length() == 0;
if (addHeader) {
// pre-create directories if necessary
outFile.getParentFile().mkdirs();
}
bw = new BufferedWriter(new FileWriter(outFile, true));
if (addHeader) {
bw.write("Time");
for (StatisticsGroup group : conf.getGroupList()) {
bw.write(COMMA);
bw.write(group.getDisplayName());
}
for (StatisticsAggregate aggr : conf.getAggregateList()) {
bw.write(COMMA);
bw.write(aggr.getDisplayName());
}
bw.newLine();
}
return bw;
}
/**
* Opens a buffered writer for the given StatisticsKey and
* StatisticsEventConfig. If its a new CSV file a header is also added to
* the file.
*
* @param key
* @param conf
* @return
* @throws IOException
*/
private BufferedWriter getAggregateBufferedWriter(StatisticsKey key,
StatisticsEventConfig conf) throws IOException {
BufferedWriter bw = null;
LocalizationFile siteLocalization = pm.getLocalizationFile(
configuredContext, getStatFilename(conf, true, key.epochHours));
File outFile = siteLocalization.getFile();
boolean addHeader = outFile.length() == 0;
if (addHeader) {
// pre-create directories if necessary
outFile.getParentFile().mkdirs();
}
bw = new BufferedWriter(new FileWriter(outFile, true));
if (addHeader) {
bw.write("Start,End,");
for (StatisticsGroup group : conf.getGroupList()) {
bw.write(group.getDisplayName());
bw.write(COMMA);
}
bw.write("Field,Avg,Min,Max,Sum,Count");
bw.newLine();
}
return bw;
}
/**
* Writes the raw statistics to disk in CSV format.
*
* @param conf
* @param timeMap
*/
public void writeStatsToDisk(
StatisticsEventConfig conf,
Map<TimeRange, Multimap<StatsGroupingColumn, StatisticsEvent>> timeMap) {
if (!timeMap.isEmpty()) {
String outfilePath = null;
BufferedWriter bw = null;
try {
for (Multimap<StatsGroupingColumn, StatisticsEvent> groupedEvents : timeMap
.values()) {
for (StatsGroupingColumn group : groupedEvents.keySet()) {
Iterator<StatisticsEvent> iter = groupedEvents.get(
group).iterator();
StatisticsKey prevKey = null;
while (iter.hasNext()) {
StatisticsEvent event = iter.next();
StatisticsKey curKey = new StatisticsKey(event
.getDate().getTime());
if (!curKey.equals(prevKey)) {
Util.close(bw);
bw = getStatEventBufferedWriter(curKey, conf);
}
writeCSVOutput(bw, conf, group, event);
}
}
}
} catch (IOException e) {
statusHandler.handle(Priority.ERROR, "Failed to write File: "
+ outfilePath, e);
} finally {
Util.close(bw);
}
}
}
/**
* Writes the aggregate records to disk in CSV format.
*
* @param conf
* The StatisticsEventConfig the aggregates belong to
* @param aggregateRecords
* The aggregate records
* @throws JAXBException
*/
public void writeAggregatesToDisk(StatisticsEventConfig conf,
Collection<AggregateRecord> aggregateRecords) {
if (!aggregateRecords.isEmpty()) {
String outfilePath = null;
BufferedWriter bw = null;
try {
Iterator<AggregateRecord> iter = aggregateRecords.iterator();
StatisticsKey prevKey = null;
while (iter.hasNext()) {
AggregateRecord agg = iter.next();
StatisticsKey curKey = new StatisticsKey(agg.getStartDate()
.getTime());
if (!curKey.equals(prevKey)) {
Util.close(bw);
bw = getAggregateBufferedWriter(curKey, conf);
}
writeCSVOutput(bw, conf, agg);
}
} catch (IOException e) {
statusHandler.handle(Priority.ERROR, "Failed to write File: "
+ outfilePath, e);
} finally {
Util.close(bw);
}
}
}
/**
* Returns the most recent offlined date for the given
* StatisticsEventConfig.
*
* @param conf
* @return
* @throws LocalizationException
* @throws IOException
*/
public Date getMostRecentOfflinedAggregate(StatisticsEventConfig conf)
throws LocalizationException, IOException {
Date rval = null;
LocalizationFile siteLocalization = pm.getLocalizationFile(
configuredContext, getBaseDirectory(conf, true));
File eventDir = siteLocalization.getFile(true);
if (eventDir.exists() && eventDir.isDirectory()) {
File latestDir = null;
for (File handle : eventDir.listFiles()) {
if (handle.isDirectory()) {
try {
Date handleDate = directorySdf.parse(handle.getName());
if ((rval == null) || rval.before(handleDate)) {
rval = handleDate;
latestDir = handle;
}
} catch (ParseException e) {
statusHandler.handle(Priority.WARN, "Directory ["
+ handle.getAbsolutePath()
+ "] is not in expected date format ["
+ directorySdf.toPattern() + "]");
}
}
}
// found latest directory date
if (latestDir != null) {
for (File csv : latestDir.listFiles()) {
String name = csv.getName();
if (csv.isFile() && name.endsWith(".csv")) {
// StatType_yyyyMMddHH.csv
int index = name.indexOf('_');
if (index >= 0) {
try {
Date handleDate = fileSdf.parse(name.substring(
index + 1, index + 11));
if ((rval == null) || rval.before(handleDate)) {
rval = handleDate;
}
} catch (ParseException e) {
statusHandler.handle(Priority.WARN, "File ["
+ csv.getAbsolutePath()
+ "] is not in expected date format ["
+ fileSdf.toPattern() + "]");
}
}
}
}
}
}
return rval;
}
/**
* Handle retention day rules, -1 keep nothing, 0 keep everything, any
* positive number keep that many full days.
*
* @param retentionDays
* @return
*/
private long getMinTime(int retentionDays) {
long currentDay = System.currentTimeMillis() / TimeUtil.MILLIS_PER_DAY;
if (retentionDays == 0) {
return 0;
} else if (retentionDays < 0) {
return currentDay * TimeUtil.MILLIS_PER_DAY;
} else {
// add 1 day to not include current day
return (currentDay - (retentionDays + 1)) * TimeUtil.MILLIS_PER_DAY;
}
}
/**
* Purges offline statistics directories for the given
* StatisticsEventConfig.
*
* @param conf
* @return
*/
public void purgeOffline(StatisticsEventConfig conf) {
// purge aggregates
long minTime = getMinTime(conf.getAggregateOfflineRetentionDays());
if (minTime > 0) {
purgeDir(getBaseDirectory(conf, true), minTime);
}
// purge raw
minTime = getMinTime(conf.getRawOfflineRetentionDays());
if (minTime > 0) {
purgeDir(getBaseDirectory(conf, false), minTime);
}
}
/**
* Purges a given stat event dir keeping any directories newer than minTime.
*
* @param dir
* @param minTime
*/
private void purgeDir(String dir, long minTime) {
LocalizationFile siteLocalization = pm.getLocalizationFile(
configuredContext, dir);
File eventDir = siteLocalization.getFile();
if (eventDir.exists() && eventDir.isDirectory()) {
try {
for (File handle : eventDir.listFiles()) {
if (handle.isDirectory()) {
try {
Date handleDate = directorySdf.parse(handle
.getName());
if (handleDate.getTime() <= minTime) {
FileUtil.deleteDir(handle);
}
} catch (ParseException e) {
statusHandler.warn("Directory ["
+ handle.getAbsolutePath()
+ "] is not in expected date format ["
+ directorySdf.toPattern() + "]");
}
}
}
} catch (Exception e) {
statusHandler.error(
"Error occurred purging " + eventDir.getAbsolutePath(),
e);
}
}
}
}

View file

@ -25,14 +25,13 @@ import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import javax.xml.bind.JAXBException;
import com.raytheon.uf.common.dataquery.db.QueryParam.QueryOperand;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.stats.AggregateRecord;
import com.raytheon.uf.common.stats.StatsRecord;
import com.raytheon.uf.common.stats.xml.StatisticsEventConfig;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.edex.database.DataAccessLayerException;
@ -41,21 +40,18 @@ import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.raytheon.uf.edex.database.purge.PurgeRule;
import com.raytheon.uf.edex.database.purge.PurgeRuleSet;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
import com.raytheon.uf.edex.stats.util.Archiver;
import com.raytheon.uf.edex.stats.util.ConfigLoader;
/**
* Purges the stats table of expired/unused stat records. Purges the aggregate
* table and write it to disk.
*
* *
* Purges the stats table of expired/unused stat records.
*
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Initial creation.
*
* Aug 21, 2012 jsanchez Initial creation.
* May 22, 2013 1917 rjpeter Added purging off offline statistics.
* </pre>
*
* @author jsanchez
@ -66,8 +62,6 @@ public class StatsPurge {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(StatsPurge.class);
private Archiver archiver;
private final CoreDao aggregateRecordDao = new CoreDao(DaoConfig.forClass(
"metadata", AggregateRecord.class));
@ -81,57 +75,53 @@ public class StatsPurge {
public StatsPurge() {
aggregatePurgeRules = readPurgeRules("aggregatePurgeRules.xml");
statsPurgeRules = readPurgeRules("statsPurgeRules.xml");
try {
archiver = new Archiver();
purgeStats();
} catch (DataAccessLayerException e) {
statusHandler
.error("Error purging stats on start up. Stats will not be purged. ",
e);
}
public void purge() {
purgeAggregates();
purgeStats();
// purge offline stats
OfflineStatsManager offlineStats = new OfflineStatsManager();
ConfigLoader loader = ConfigLoader.getInstance();
for (StatisticsEventConfig conf : loader.getTypeView().values()) {
offlineStats.purgeOffline(conf);
}
}
/**
* Purges records from the aggregate table and writes them to disk.
*/
public void purgeAggregates() throws JAXBException,
DataAccessLayerException {
public void purgeAggregates() {
if (aggregatePurgeRules != null) {
Calendar expiration = Calendar.getInstance(TimeZone
.getTimeZone("GMT"));
DatabaseQuery query = new DatabaseQuery(AggregateRecord.class);
List<PurgeRule> allRules = new ArrayList<PurgeRule>();
try {
Calendar expiration = Calendar.getInstance(TimeZone
.getTimeZone("GMT"));
DatabaseQuery deleteStmt = new DatabaseQuery(
AggregateRecord.class);
List<PurgeRule> allRules = new ArrayList<PurgeRule>();
// check for specific rules, if none, apply defaults
if (!aggregatePurgeRules.getRules().isEmpty()) {
allRules.addAll(aggregatePurgeRules.getRules());
} else if (!aggregatePurgeRules.getDefaultRules().isEmpty()) {
allRules.addAll(aggregatePurgeRules.getDefaultRules());
}
// check for specific rules, if none, apply defaults
if (!aggregatePurgeRules.getRules().isEmpty()) {
allRules.addAll(aggregatePurgeRules.getRules());
} else if (!aggregatePurgeRules.getDefaultRules().isEmpty()) {
allRules.addAll(aggregatePurgeRules.getDefaultRules());
}
for (PurgeRule rule : allRules) {
if (rule.isPeriodSpecified()) {
long ms = rule.getPeriodInMillis();
int minutes = new Long(ms / (1000 * 60)).intValue();
expiration.add(Calendar.MINUTE, -minutes);
for (PurgeRule rule : allRules) {
if (rule.isPeriodSpecified()) {
long ms = rule.getPeriodInMillis();
int minutes = new Long(ms / (1000 * 60)).intValue();
expiration.add(Calendar.MINUTE, -minutes);
query.addQueryParam("endDate", expiration,
QueryOperand.LESSTHAN);
deleteStmt.addQueryParam("endDate", expiration,
QueryOperand.LESSTHAN);
List<?> objects = aggregateRecordDao.queryByCriteria(query);
if (!objects.isEmpty()) {
AggregateRecord[] aggregateRecords = new AggregateRecord[objects
.size()];
for (int i = 0; i < aggregateRecords.length; i++) {
aggregateRecords[i] = (AggregateRecord) objects
.get(i);
}
archiver.writeToDisk(aggregateRecords);
aggregateRecordDao.deleteAll(objects);
aggregateRecordDao.deleteByCriteria(deleteStmt);
}
}
} catch (DataAccessLayerException e) {
statusHandler.error("Error purging stats aggregates", e);
}
}
}
@ -140,21 +130,25 @@ public class StatsPurge {
* Purges records from the stats table if they are older than the expiration
* time.
*/
private void purgeStats() throws DataAccessLayerException {
private void purgeStats() {
if (statsPurgeRules != null) {
Calendar expiration = Calendar.getInstance(TimeZone
.getTimeZone("GMT"));
DatabaseQuery deleteStmt = new DatabaseQuery(StatsRecord.class);
try {
Calendar expiration = Calendar.getInstance(TimeZone
.getTimeZone("GMT"));
DatabaseQuery deleteStmt = new DatabaseQuery(StatsRecord.class);
for (PurgeRule rule : statsPurgeRules.getRules()) {
if (rule.isPeriodSpecified()) {
long ms = rule.getPeriodInMillis();
int minutes = new Long(ms / (1000 * 60)).intValue();
expiration.add(Calendar.MINUTE, -minutes);
deleteStmt.addQueryParam("date", expiration,
QueryOperand.LESSTHAN);
statsRecordDao.deleteByCriteria(deleteStmt);
for (PurgeRule rule : statsPurgeRules.getRules()) {
if (rule.isPeriodSpecified()) {
long ms = rule.getPeriodInMillis();
int minutes = new Long(ms / (1000 * 60)).intValue();
expiration.add(Calendar.MINUTE, -minutes);
deleteStmt.addQueryParam("date", expiration,
QueryOperand.LESSTHAN);
statsRecordDao.deleteByCriteria(deleteStmt);
}
}
} catch (DataAccessLayerException e) {
statusHandler.error("Error purging stats aggregates", e);
}
}
}

View file

@ -1,16 +1,60 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.stats.dao;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.hibernate.Query;
import org.hibernate.Session;
import org.hibernate.Transaction;
import com.raytheon.uf.common.dataquery.db.QueryParam.QueryOperand;
import com.raytheon.uf.common.stats.AggregateRecord;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
/**
* Record class for stats waiting to be stored in the appropriate bucket.
*
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Initial creation
* May 22, 2013 1917 rjpeter Added query methods for retrieving data about aggregates.
* </pre>
*
* @author jsanchez
*/
public class AggregateRecordDao extends CoreDao {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(AggregateRecordDao.class);
/**
* Creates a new data access object
*/
@ -61,4 +105,109 @@ public class AggregateRecordDao extends CoreDao {
persist(newRecord);
}
}
/**
* Returns the oldest start date for a given aggregate eventType.
*
* @param eventType
* @return
* @throws DataAccessLayerException
*/
public Date getOldestAggregateDate(final String eventType)
throws DataAccessLayerException {
Session sess = null;
Transaction tx = null;
try {
sess = getHibernateTemplate().getSessionFactory().openSession();
tx = sess.beginTransaction();
Query query = sess
.createQuery("SELECT MIN(startDate) FROM AggregateRecord WHERE eventType = ?");
query.setString(0, eventType);
Calendar rval = (Calendar) query.uniqueResult();
tx.commit();
if (rval != null) {
return rval.getTime();
}
return null;
} catch (Exception e) {
if (tx != null) {
try {
tx.rollback();
} catch (Exception e1) {
statusHandler.error(
"Error occurred rolling back transaction", e1);
}
}
throw new DataAccessLayerException(
"Unable to look up min start date for event [" + eventType
+ "]", e);
} finally {
if (sess != null) {
try {
sess.close();
} catch (Exception e) {
statusHandler.error(
"Error occurred closing database session", e);
}
}
}
}
/**
* Returns all aggregates of a given type and such that startDate >=
* event.startDate < endDate.
*
* @param eventType
* @param startDate
* @param endDate
* @return
* @throws DataAccessLayerException
*/
public List<AggregateRecord> getAggregates(final String eventType,
final Date startDate, final Date endDate)
throws DataAccessLayerException {
Session sess = null;
Transaction tx = null;
try {
sess = getHibernateTemplate().getSessionFactory().openSession();
tx = sess.beginTransaction();
Query query = sess
.createQuery("FROM AggregateRecord WHERE eventType = ? AND startDate >= ? AND startDate < ? ORDER BY startDate");
query.setString(0, eventType);
query.setTimestamp(1, startDate);
query.setTimestamp(2, endDate);
@SuppressWarnings("unchecked")
List<AggregateRecord> rval = query.list();
tx.commit();
return rval;
} catch (Exception e) {
if (tx != null) {
try {
tx.rollback();
} catch (Exception e1) {
statusHandler.error(
"Error occurred rolling back transaction", e1);
}
}
throw new DataAccessLayerException(
"Unable to look up aggregates for event [" + eventType
+ "]", e);
} finally {
if (sess != null) {
try {
sess.close();
} catch (Exception e) {
statusHandler.error(
"Error occurred closing database session", e);
}
}
}
}
}

View file

@ -1,8 +1,30 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.stats.dao;
import java.util.Calendar;
import java.util.List;
import org.hibernate.Query;
import org.hibernate.StatelessSession;
import com.raytheon.uf.common.dataquery.db.QueryParam.QueryOperand;
import com.raytheon.uf.common.stats.StatsRecord;
import com.raytheon.uf.edex.database.DataAccessLayerException;
@ -10,6 +32,20 @@ import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
/**
* Data access object for raw statistics.
*
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Initial creation
* May 22, 2013 1917 rjpeter Added reclaimSpace.
* </pre>
*
* @author jsanchez
*/
public class StatsDao extends CoreDao {
/**
* Creates a new data access object
@ -43,4 +79,35 @@ public class StatsDao extends CoreDao {
return (List<StatsRecord>) queryByCriteria(query);
}
/**
* Manually runs vacuum due to large numbers of inserts and deletes to keep
* table size to a minimum.
*/
public void reclaimSpace() {
StatelessSession sess = null;
try {
sess = getHibernateTemplate().getSessionFactory()
.openStatelessSession();
// vacuum can't run within a transaction, hack to allow vacuum to
// run from within hibernate
Query query = sess
.createSQLQuery("rollback; VACUUM ANALYZE events.stats");
query.executeUpdate();
statusHandler.info("stats vacuumed");
} catch (Exception e) {
statusHandler.error(
"Error occurred running VACUUM on events.stats", e);
} finally {
if (sess != null) {
try {
sess.close();
} catch (Exception e) {
statusHandler.error(
"Error occurred closing database session", e);
}
}
}
}
}

View file

@ -56,10 +56,10 @@ import com.raytheon.uf.common.util.CollectionUtil;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 15, 2012 728 mpduff Initial creation
* Nov 15, 2012 728 mpduff Initial creation
* Jan 15, 2013 1487 djohnson Use xml for the grouping information on an {@link AggregateRecord}.
* Jan 17, 2013 1357 mpduff Remove unit conversions, add time step, other cleanup.
*
* Jan 17, 2013 1357 mpduff Remove unit conversions, add time step, other cleanup.
* May 22, 2013 1917 rjpeter Made unmarshalGroupingColumnFromRecord public.
* </pre>
*
* @author mpduff
@ -268,7 +268,7 @@ public class StatsDataAccumulator {
* @return the unmarshalled column, or an empty column if unable to
* unmarshal
*/
private static StatsGroupingColumn unmarshalGroupingColumnFromRecord(
public static StatsGroupingColumn unmarshalGroupingColumnFromRecord(
AggregateRecord record) {
String groupingXmlAsString = record.getGrouping();
try {

View file

@ -32,7 +32,7 @@ import com.raytheon.uf.common.stats.GraphDataResponse;
import com.raytheon.uf.common.stats.data.GraphData;
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
import com.raytheon.uf.common.stats.xml.StatisticsEventConfig;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
@ -49,9 +49,9 @@ import com.raytheon.uf.edex.stats.util.ConfigLoader;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 11, 2012 728 mpduff Initial creation
* Sep 11, 2012 728 mpduff Initial creation
* Jan 07, 2013 1451 djohnson Use newGmtCalendar().
*
* May 22, 2013 1917 rjpeter Renamed StatisticsEvent to StatisticsEventConfig.
* </pre>
*
* @author mpduff
@ -199,7 +199,7 @@ public class GraphDataHandler implements IRequestHandler<GraphDataRequest> {
for (StatisticsConfig config : configList) {
for (String cat : config.getCategories()) {
if (cat.equals(category)) {
for (StatisticsEvent event : config.getEvents()) {
for (StatisticsEventConfig event : config.getEvents()) {
if (event.getType().equals(type)) {
for (StatisticsAggregate agg : event
.getAggregateList()) {

View file

@ -32,7 +32,7 @@ import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.stats.StatsRecord;
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
import com.raytheon.uf.common.stats.xml.StatisticsEventConfig;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.edex.database.dao.CoreDao;
@ -75,7 +75,7 @@ public class StatsHandler {
public static void setValidEventTypes(List<StatisticsConfig> configurations) {
validEventTypes = new HashSet<String>();
for (StatisticsConfig config : configurations) {
for (StatisticsEvent event : config.getEvents()) {
for (StatisticsEventConfig event : config.getEvents()) {
validEventTypes.add(event.getType());
}
}
@ -103,7 +103,7 @@ public class StatsHandler {
HashSet<String> myValidEventTypes = new HashSet<String>();
for (StatisticsConfig config : configLoader.getConfigurations()) {
for (StatisticsEvent event : config.getEvents()) {
for (StatisticsEventConfig event : config.getEvents()) {
myValidEventTypes.add(event.getType());
}
}

View file

@ -1,276 +0,0 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.stats.util;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.xml.bind.JAXBException;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.stats.AggregateRecord;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.TimeRange;
/**
* Archives the data in the aggregate_bucket table to an xml file.
*
* <pre>
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Initial creation.
* Nov 09, 2012 dhladky Changed to CSV output
* Jan 24, 2013 1357 mpduff Fix comma output and paths.
*
* </pre>
*
* @author jsanchez
*
*/
public class Archiver {
private class StatisticsKey {
public String eventType;
public String grouping;
public TimeRange timeRange;
@Override
public boolean equals(Object o) {
if (o != null && o instanceof StatisticsKey) {
StatisticsKey other = (StatisticsKey) o;
return eventType.equals(other.eventType)
&& timeRange.getStart().equals(
other.timeRange.getStart())
&& timeRange.getEnd().equals(other.timeRange.getEnd());
}
return false;
}
@Override
public int hashCode() {
return 1;
}
}
private static final String COMMA = ",";
private static final Pattern NLPattern = Pattern.compile("[\\n\\r]+");
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(Archiver.class);
private final IPathManager pm = PathManagerFactory.getPathManager();
private final LocalizationContext context = pm.getContext(
LocalizationType.COMMON_STATIC, LocalizationLevel.SITE);
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final String FILE_DATE_FORMAT = "yyyyMMdd_HHmm";
private static final Pattern PERIOD_PATTERN = Pattern.compile("\\.");
public Archiver() {
}
/**
* Creates a filename in the format /stats/aggregates/group...
* /eventType.start-end.dat
*
* @param items
* @return
*/
private String createFilename(TimeRange tr, String eventType) {
SimpleDateFormat fileDateFormatter = new SimpleDateFormat(
FILE_DATE_FORMAT);
StringBuilder sb = new StringBuilder("stats/aggregates");
String[] chunks = PERIOD_PATTERN.split(eventType);
sb.append("/");
sb.append(chunks[chunks.length - 1]);
sb.append(".");
sb.append(fileDateFormatter.format(tr.getStart()));
sb.append("-");
sb.append(fileDateFormatter.format(tr.getEnd()));
sb.append(".csv");
return sb.toString();
}
/**
* Used for outputting the stats as CSV
*
* @return
*/
private String getCSVOutput(AggregateRecord agrec,
SimpleDateFormat dateFormat) {
StringBuilder sb = new StringBuilder();
String eventType = agrec.getEventType();
Calendar startDate = agrec.getStartDate();
Calendar endDate = agrec.getEndDate();
String grouping = agrec.getGrouping();
String field = agrec.getField();
double max = agrec.getMax();
double min = agrec.getMin();
double sum = agrec.getSum();
double count = agrec.getCount();
if (eventType != null) {
sb.append(eventType);
}
sb.append(COMMA);
if (startDate != null) {
sb.append(dateFormat.format(startDate.getTime()));
}
sb.append(COMMA);
if (endDate != null) {
sb.append(dateFormat.format(endDate.getTime()));
}
sb.append(COMMA);
if (grouping != null) {
sb.append(NLPattern.matcher(grouping).replaceAll(""));
}
sb.append(COMMA);
if (field != null) {
sb.append(field);
}
sb.append(COMMA);
sb.append(max).append(COMMA);
sb.append(min).append(COMMA);
sb.append(sum).append(COMMA);
sb.append(count);
return sb.toString();
}
/**
* Writes the aggregate records to disk.
*
* @param aggregateRecords
* @throws JAXBException
*/
public void writeToDisk(AggregateRecord[] aggregateRecords) {
Map<StatisticsKey, List<AggregateRecord>> statisticsMap = new HashMap<StatisticsKey, List<AggregateRecord>>();
for (AggregateRecord record : aggregateRecords) {
StatisticsKey key = new StatisticsKey();
key.eventType = record.getEventType();
key.grouping = record.getGrouping();
key.timeRange = new TimeRange(record.getStartDate(),
record.getEndDate());
List<AggregateRecord> aggregateRecordList = statisticsMap.get(key);
if (aggregateRecordList == null) {
aggregateRecordList = new ArrayList<AggregateRecord>();
statisticsMap.put(key, aggregateRecordList);
}
aggregateRecordList.add(record);
}
for (StatisticsKey key : statisticsMap.keySet()) {
String eventType = key.eventType;
List<AggregateRecord> records = statisticsMap.get(key);
String filename = createFilename(key.timeRange, eventType);
try {
writeToFile(filename, records);
} catch (JAXBException e) {
statusHandler.error("Unable to write statistics file "
+ filename, e);
}
}
}
/**
* Writes the statistics xml to disk.
*
* @param statistics
* @throws JAXBException
*/
public void writeToFile(String filename, List<AggregateRecord> records)
throws JAXBException {
BufferedWriter bw = null;
SimpleDateFormat dateFormatter = new SimpleDateFormat(DATE_FORMAT);
LocalizationFile siteLocalization = pm.getLocalizationFile(context,
filename);
String outputFilePath = siteLocalization.getFile().getAbsolutePath();
// pre-create directories if necessary
siteLocalization.getFile().getParentFile().mkdirs();
// Write this to output CSV
try {
bw = new BufferedWriter(new FileWriter(outputFilePath));
if (bw != null) {
for (AggregateRecord agrec : records) {
bw.write(getCSVOutput(agrec, dateFormatter));
bw.newLine();
}
}
} catch (IOException e) {
statusHandler.handle(Priority.ERROR, "Failed to write File: "
+ outputFilePath, e);
} finally {
if (bw != null) {
try {
bw.close();
} catch (IOException e) {
statusHandler.handle(Priority.PROBLEM,
"failed to close CSV output file stream. "
+ filename, e);
}
}
}
}
}

View file

@ -41,7 +41,7 @@ import com.raytheon.uf.common.localization.exception.LocalizationException;
import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.stats.xml.StatisticsAggregate;
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
import com.raytheon.uf.common.stats.xml.StatisticsEventConfig;
import com.raytheon.uf.common.stats.xml.StatisticsGroup;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
@ -58,11 +58,12 @@ import com.raytheon.uf.common.util.ReflectionUtil;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Aug 21, 2012 jsanchez Updated error handling and validated config files.
* Nov 07, 2012 1317 mpduff Update config files.
* Nov 29, 2012 1350 rjpeter Updated to static, fixed localization, increased validation.
* Jan 15, 2013 1487 djohnson Make validate() static and public, so it can be run independently.
* Mar 27, 2013 1834 mpduff Filter for xml files on localization file read, wrap unmarshall and
* Nov 07, 2012 1317 mpduff Update config files.
* Nov 29, 2012 1350 rjpeter Updated to static, fixed localization, increased validation.
* Jan 15, 2013 1487 djohnson Make validate() static and public, so it can be run independently.
* Mar 27, 2013 1834 mpduff Filter for xml files on localization file read, wrap unmarshall and
* log error if one occurs
* May 22, 2013 1917 rjpeter Updated validate to save typeClass back to StatisticsEventConfig.
* </pre>
*
* @author jsanchez
@ -81,7 +82,7 @@ public class ConfigLoader {
private List<StatisticsConfig> configurations = Collections.emptyList();
private Map<String, StatisticsEvent> classToEventMap = Collections
private Map<String, StatisticsEventConfig> classToEventMap = Collections
.emptyMap();
private static final String STATS_DIR = "stats";
@ -113,7 +114,7 @@ public class ConfigLoader {
*
* @return
*/
public Map<String, StatisticsEvent> getTypeView() {
public Map<String, StatisticsEventConfig> getTypeView() {
return classToEventMap;
}
@ -144,7 +145,7 @@ public class ConfigLoader {
if (!statConfs.isEmpty()) {
List<StatisticsConfig> myConfigurations = new ArrayList<StatisticsConfig>(
statConfs.size());
Map<String, StatisticsEvent> myEvents = new HashMap<String, StatisticsEvent>();
Map<String, StatisticsEventConfig> myEvents = new HashMap<String, StatisticsEventConfig>();
for (LocalizationFile lf : statConfs.values()) {
try {
@ -174,17 +175,17 @@ public class ConfigLoader {
* @param config
*/
@VisibleForTesting
public static void validate(Map<String, StatisticsEvent> eventMap,
public static void validate(Map<String, StatisticsEventConfig> eventMap,
StatisticsConfig config) {
for (Iterator<StatisticsEvent> iter = config.getEvents().iterator(); iter
.hasNext();) {
StatisticsEvent event = iter.next();
for (Iterator<StatisticsEventConfig> iter = config.getEvents()
.iterator(); iter.hasNext();) {
StatisticsEventConfig event = iter.next();
String eventType = event.getType();
if (!eventMap.containsKey(eventType)) {
try {
Class<?> clazz = Class.forName(eventType);
// verify the type is an Event
clazz.asSubclass(Event.class);
event.setTypeClass(clazz.asSubclass(Event.class));
// validate groupBy fields can be found
List<StatisticsGroup> groups = event.getGroupList();

View file

@ -1,7 +1,9 @@
<statisticsConfig>
<!-- Event Type should be fully qualified name of stat event -->
<!-- raw and aggregate OfflineRetentionDays: Value less than zero disables saving of raw statistic, zero is never purge -->
<statisticsEvent type="com.raytheon.uf.common.stats.ProcessEvent"
displayName="Processing Events" category="Data Ingest Events">
displayName="Processing Events" category="Data Ingest Events"
rawOfflineRetentionDays="-1" aggregateOfflineRetentionDays="90">
<statisticsGroup name="dataType" displayName="Data Type" />
<!-- Processing time available display units:
ms, Seconds, Minutes, Hours -->

View file

@ -44,7 +44,7 @@ import com.raytheon.uf.common.serialization.JAXBManager;
import com.raytheon.uf.common.stats.StatsGrouping;
import com.raytheon.uf.common.stats.StatsGroupingColumn;
import com.raytheon.uf.common.stats.xml.StatisticsConfig;
import com.raytheon.uf.common.stats.xml.StatisticsEvent;
import com.raytheon.uf.common.stats.xml.StatisticsEventConfig;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.stats.util.ConfigLoader;
@ -70,8 +70,7 @@ public class AggregateManagerTest {
@BeforeClass
public static void classSetUp() throws JAXBException {
jaxbManager = new JAXBManager(StatisticsConfig.class,
StatsGroupingColumn.class);
jaxbManager = new JAXBManager(StatisticsConfig.class);
}
@Before
@ -90,7 +89,8 @@ public class AggregateManagerTest {
final StatisticsConfig statisticsConfig = lf.jaxbUnmarshal(
StatisticsConfig.class, jaxbManager);
ConfigLoader.validate(Maps.<String, StatisticsEvent> newHashMap(),
ConfigLoader.validate(
Maps.<String, StatisticsEventConfig> newHashMap(),
statisticsConfig);
MockEvent mockEvent = new MockEvent();
@ -102,15 +102,13 @@ public class AggregateManagerTest {
List<StatsGrouping> groupList = new ArrayList<StatsGrouping>();
groupList.add(new StatsGrouping("pluginName", "somePlugin"));
groupList.add(new StatsGrouping("fileName", "someFileName"));
StatsGroupingColumn column = new StatsGroupingColumn();
column.setGroup(groupList);
StatsGroupingColumn expectedGroupingColumn = new StatsGroupingColumn();
expectedGroupingColumn.setGroup(groupList);
final String expectedGroupRepresentation = jaxbManager
.marshalToXml(column);
final String actualGroupRepresentation = AggregateManager.determineGroupRepresentationForEvent(
statisticsConfig.getEvents().iterator().next(), mockEvent);
assertThat(actualGroupRepresentation,
is(equalTo(expectedGroupRepresentation)));
final StatsGroupingColumn actualGroupingColumn = AggregateManager
.determineGroupRepresentationForEvent(statisticsConfig
.getEvents().iterator().next(), mockEvent);
assertThat(actualGroupingColumn, is(equalTo(expectedGroupingColumn)));
}
}