Merge "Issue #1959 MADIS implementation peice in AWIPS II baseline" into development

Former-commit-id: 49801bd5e2 [formerly 48d8bf941a] [formerly 00dc4787f6] [formerly 8a29b450c4 [formerly 00dc4787f6 [formerly e892ed6958df9141768b9593dee93492d8e707d3]]]
Former-commit-id: 8a29b450c4
Former-commit-id: 8aca75081b62bccf75287ba01aa868a459ab98c3 [formerly f00d9146e4]
Former-commit-id: 6ab64bbbcd
This commit is contained in:
Dustin Johnson 2013-05-02 12:54:37 -05:00 committed by Gerrit Code Review
commit 1485c27bc1
15 changed files with 587 additions and 209 deletions

View file

@ -329,15 +329,23 @@
<!-- OGC/DPA services -->
<include>ogc-common.xml</include>
<include>.*-ogc-request.xml</include>
<!-- Purge OGC/DPA registred plugins -->
<include>purge-spring.xml</include>
<include>ogc-purge.xml</include>
</mode>
<!-- This is a sample reference implmentation -->
<!-- This is MADIS implmentation of dataprovideragent -->
<mode name="dataprovideragent">
<includeMode>dataProviderAgentTemplate</includeMode>
<!-- pointdata/obs specific services -->
<!-- pointdata/madis specific services -->
<include>madis-common.xml</include>
<include>pointdata-common.xml</include>
<include>madis-dpa-ingest.xml</include>
<include>madis-ogc.xml</include>-->
<!-- pointdata/obs specific services
<include>obs-common.xml</include>
<include>pointdata-common.xml</include>
<include>obs-dpa-ingest.xml</include>
<include>obs-ogc.xml</include>
<include>obs-ogc.xml</include>-->
<!-- grid specific services
<include>grib-common.xml</include>
<include>grid-staticdata-process.xml</include>

View file

@ -65,6 +65,9 @@ public class ConfigLayer implements ISerializableObject {
@XmlElement(name = "maxy", required = true)
private Double maxy;
@XmlElement(name = "crs", required = true)
private String crs;
@XmlElements({ @XmlElement(name = "parameter", type = Parameter.class, required = true) })
private List<Parameter> parameters;
@ -127,4 +130,12 @@ public class ConfigLayer implements ISerializableObject {
}
return parm;
}
public void setCrs(String crs) {
this.crs = crs;
}
public String getCrs() {
return crs;
}
}

View file

@ -18,6 +18,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 12 Sept, 2012 1038 dhladky Initial creation
* 1 May 2013 1959 dhladky remove backup registry references
*
* </pre>
*
@ -33,18 +34,6 @@ public class HarvesterConfig implements ISerializableObject {
@DynamicSerializeElement
private Provider provider;
@XmlElement(name = "primaryRegistryHost")
@DynamicSerializeElement
private String primaryRegistryHost;
@XmlElement(name = "secondaryRegistryHost")
@DynamicSerializeElement
private String secondaryRegistryHost;
@XmlElement(name = "tertiaryRegistryHost")
@DynamicSerializeElement
private String tertiaryRegistryHost;
@XmlElement(name = "agent")
@DynamicSerializeElement
private Agent agent;
@ -57,40 +46,16 @@ public class HarvesterConfig implements ISerializableObject {
return agent;
}
public String getPrimaryRegistryHost() {
return primaryRegistryHost;
}
public Provider getProvider() {
return provider;
}
public String getSecondaryRegistryHost() {
return secondaryRegistryHost;
}
public String getTertiaryRegistryHost() {
return tertiaryRegistryHost;
}
public void setAgent(Agent agent) {
this.agent = agent;
}
public void setPrimaryRegistryHost(String primaryRegistryHost) {
this.primaryRegistryHost = primaryRegistryHost;
}
public void setProvider(Provider provider) {
this.provider = provider;
}
public void setSecondaryRegistryHost(String secondaryRegistryHost) {
this.secondaryRegistryHost = secondaryRegistryHost;
}
public void setTertiaryRegistryHost(String tertiaryRegistryHost) {
this.tertiaryRegistryHost = tertiaryRegistryHost;
}
}

View file

@ -32,7 +32,8 @@ Require-Bundle: org.eclipse.core.runtime,
com.raytheon.uf.edex.registry.ebxml;bundle-version="1.0.0",
com.raytheon.uf.common.registry.ebxml;bundle-version="1.0.0",
com.raytheon.uf.common.stats;bundle-version="1.0.0",
com.raytheon.uf.common.datadelivery.harvester;bundle-version="1.0.0"
com.raytheon.uf.common.datadelivery.harvester;bundle-version="1.0.0",
com.raytheon.uf.edex.purgesrv;bundle-version="1.12.1174"
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Bundle-ActivationPolicy: lazy
Export-Package: com.raytheon.uf.edex.datadelivery.harvester;uses:="com.raytheon.uf.edex.datadelivery.harvester.crawler",

View file

@ -0,0 +1,14 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<bean id="purgeManager" class="com.raytheon.uf.edex.datadelivery.harvester.purge.OGCPurgeManager">
<property name="clusterLimit" value="${purge.clusterlimit}"/>
<property name="serverLimit" value="${purge.serverlimit}"/>
<property name="deadPurgeJobAge" value="${purge.deadjobage}"/>
<property name="purgeFrequency" value="${purge.frequency}"/>
<property name="fatalFailureCount" value="${purge.fatalfailurecount}"/>
<property name="purgeEnabled" value="${purge.enabled}"/>
</bean>
</beans>

View file

@ -0,0 +1,67 @@
package com.raytheon.uf.edex.datadelivery.harvester.purge;
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
import java.util.ArrayList;
import java.util.List;
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
import com.raytheon.uf.edex.purgesrv.PurgeManager;
/*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 11, 2013 #1959 dhladky Our own OGC purger
*
* </pre>
*
* @author dhladky
* @version 1.0
*/
public class OGCPurgeManager extends PurgeManager {
/**
* Creates a new PurgeManager
*/
protected OGCPurgeManager() {
}
/**
* Executes the purge only on available plugins that are registered with
* camel. This works better for the DPA instances that will be running it.
* They aren't required to purge data from plugins that aren't registerd to
* their JVM which would be highly wasteful in this instance.
*/
public void executePurge() {
// check for any new plugins or database being purged and needing
// entries recreated
List<String> availablePlugins = new ArrayList<String>(PluginRegistry
.getInstance().getRegisteredObjects());
purgeRunner(availablePlugins);
}
}

View file

@ -0,0 +1,293 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<harvester xmlns:ns2="com.raytheon.uf.common.datadelivery.registry"
xmlns:ns4="http://www.w3.org/1999/xlink" xmlns:ns3="urn:oasis:names:tc:ebxml-regrep:xsd:rim:4.0"
xmlns:ns9="urn:oasis:names:tc:ebxml-regrep:xsd:lcm:4.0" xmlns:ns5="http://www.w3.org/2005/08/addressing"
xmlns:ns6="urn:oasis:names:tc:ebxml-regrep:xsd:rs:4.0" xmlns:ns7="urn:oasis:names:tc:ebxml-regrep:xsd:query:4.0"
xmlns:ns8="urn:oasis:names:tc:ebxml-regrep:xsd:spi:4.0">
<provider serviceType="WFS" name="MADISOGC">
<connection>
<!-- for OGC it's your FQDN
!!!!!PLACE YOUR URL HERE!!!!!!-->
<url>http://your.url.here:8085</url>
</connection>
<providerType>Point</providerType>
<projection type="LatLon">
<name>MadisLatLon</name>
<description>MADIS Test LatLon Coverage</description>
</projection>
</provider>
<agent xsi:type="ogcAgent" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<dateFormat>HHddMMMyyyy</dateFormat>
<layer name="madis">
<!-- Geographic constraint of madis layer data -->
<minx>-120.0</minx>
<maxx>-70.0</maxx>
<miny>20.0</miny>
<maxy>50.0</maxy>
<crs>crs:84</crs>
<!-- Registry Parameters in madis layer -->
<parameter name="timeObs" providerName="OBSTIME"
definition="Time of observation" dataType="POINT" baseType="String">
<levelType levelType="SFC" />
</parameter>
<parameter name="provider" providerName="PVDR"
definition="Provider Network" dataType="POINT" baseType="String">
<levelType levelType="SFC" />
</parameter>
<parameter name="sub_provider" providerName="SUBPVDR"
definition="Sub Provider Network" dataType="POINT" baseType="String">
<levelType levelType="SFC" />
</parameter>
<parameter name="restriction" providerName="RES"
definition="Restriction Level" dataType="POINT" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="dataset" providerName="DS"
definition="Dataset" dataType="POINT" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="obsLocation" providerName="obsLocation"
definition="Location of Observation" dataType="POINT" units="Degrees"
missingValue="-9999" fillValue="-9999" baseType="ObsLocation">
<levelType levelType="SFC" />
</parameter>
<parameter name="dewpoint" providerName="TD"
definition="Dewpoint Temperature" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Float" units="K">
<levelType levelType="SFC" />
</parameter>
<parameter name="dewpoint_qcd" providerName="TD_QCD"
definition="Dewpoint QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="dewpoint_qca" providerName="TD_QCA"
definition="Dewpoint QCD" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="dewpoint_qcr" providerName="TD_QCR"
definition="Dewpoint QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="rh" providerName="RH"
definition="Relative Humidity" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Integer" units="%">
<levelType levelType="SFC" />
</parameter>
<parameter name="rh_qcd" providerName="RH_QCD"
definition="Relative Humidity QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="rh_qca" providerName="RH_QCA"
definition="Relative Humidity QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="rh_qcr" providerName="RH_QCR"
definition="Relative Humidity QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="altimeter" providerName="ALTSE"
definition="Altimeter Setting" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Integer" units="Pa">
<levelType levelType="SFC" />
</parameter>
<parameter name="altimeter_qcd" providerName="ALTSE_QCD"
definition="Altimeter Setting QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="altimeter_qca" providerName="ALTSE_QCA"
definition="Altimeter Setting QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="altimeter_qcr" providerName="ALTSE_QCR"
definition="Altimeter Setting QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="temperature" providerName="T"
definition="Temperature" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Float" units="K">
<levelType levelType="SFC" />
</parameter>
<parameter name="temperature_qcd" providerName="T_QCD"
definition="Temperature QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="temperature_qca" providerName="T_QCA"
definition="Temperature QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="temperature_qcr" providerName="T_QCR"
definition="Temperature QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="windDirection" providerName="DD"
definition="Wind Direction" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Integer" units="degrees">
<levelType levelType="SFC" />
</parameter>
<parameter name="windDirection_qcd" providerName="DD_QCD"
definition="Wind Direction QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="windDirection_qca" providerName="DD_QCA"
definition="Wind Direction QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="windDirection_qcr" providerName="DD_QCR"
definition="Wind Direction QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="windSpeed" providerName="FF"
definition="Wind Speed" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Float" units="degrees">
<levelType levelType="SFC" />
</parameter>
<parameter name="windSpeed_qcd" providerName="FF_QCD"
definition="Wind Speed QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="windSpeed_qca" providerName="FF_QCA"
definition="Wind Speed QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="windSpeed_qcr" providerName="FF_QCR"
definition="Wind Speed QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="elevation" providerName="ELEV"
definition="Elevation" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Float" units="degrees">
<levelType levelType="SFC" />
</parameter>
<parameter name="elevation_qcd" providerName="ELEV_QCD"
definition="Elevation QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="elevation_qca" providerName="ELEV_QCA"
definition="Elevation QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="elevation_qcr" providerName="ELEV_QCR"
definition="Elevation QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="lon_qcd" providerName="LON_QCD"
definition="longitude QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="lon_qca" providerName="LON_QCA"
definition="Longitude QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="lon_qcr" providerName="LON_QCR"
definition="Longitude QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="lat_qcd" providerName="LAT_QCD"
definition="Latitude QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="lat_qca" providerName="LAT_QCA"
definition="Latitude QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="lat_qcr" providerName="LAT_QCR"
definition="Latitude QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="precipRate" providerName="PCPRATE"
definition="Precipitation Rate" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Float" units="in/hr">
<levelType levelType="SFC" />
</parameter>
<parameter name="precipRate_qcd" providerName="PCPRATE_QCD"
definition="Precipitation Rate QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="precipRate_qca" providerName="PCPRATE_QCA"
definition="Precipitation Rate QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="precipRate_qcr" providerName="PCPRATE_QCR"
definition="Precipitation Rate QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="windGust" providerName="FFGUST"
definition="Wind Gust" dataType="POINT"
missingValue="-9999" fillValue="-9999" baseType="Float" units="kts">
<levelType levelType="SFC" />
</parameter>
<parameter name="windGust_qcd" providerName="FFGUST_QCD"
definition="Wind Gust QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="windGust_qca" providerName="FFGUST_QCA"
definition="Wind Gust QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="windGust_qcr" providerName="FFGUST_QCR"
definition="Wind Gust QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="precipitalWater" providerName="PWV"
definition="Precipital Water Vapor" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Float" units="in">
<levelType levelType="SFC" />
</parameter>
<parameter name="precipitalWater_qcd" providerName="PWV_QCD"
definition="Precipital Water Vapor QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="precipitalWater_qca" providerName="PWV_QCA"
definition="Precipital Water Vapor QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="precipitalWater_qcr" providerName="PWV_QCR"
definition="Precipital Water Vapor QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="pressure" providerName="P"
definition="Pressure" dataType="POINT" missingValue="-9999"
fillValue="-9999" baseType="Float" units="Pa">
<levelType levelType="SFC" />
</parameter>
<parameter name="pressure_qcd" providerName="P_QCD"
definition="Pressure QCD" dataType="POINT" baseType="QCD">
<levelType levelType="SFC" />
</parameter>
<parameter name="pressure_qca" providerName="P_QCA"
definition="Pressure QCA" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
<parameter name="pressure_qcr" providerName="P_QCR"
definition="Pressure QCR" dataType="POINT" missingValue="0"
fillValue="0" baseType="Integer">
<levelType levelType="SFC" />
</parameter>
</layer>
</agent>
</harvester>

View file

@ -29,14 +29,6 @@
</connection>
</provider>
<!-- Registry hosts for harvester(defaults to localhost) -->
<!-- primary -->
<primaryRegistryHost>127.0.0.1</primaryRegistryHost>
<!-- secondary backup -->
<secondaryRegistryHost>127.0.0.1</secondaryRegistryHost>
<!-- secondary backup -->
<tertiaryRegistryHost>127.0.0.1</tertiaryRegistryHost>
<agent xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="crawlAgent">
<crawlDir>/awips2/crawl</crawlDir>
<dateFormat>HHddMMMyyyy</dateFormat>
@ -56,7 +48,7 @@
<!-- Ingest new collections found by Seed Scans? -->
<ingestNew>true</ingestNew>
<!-- listen to robots.txt denied directory lists? -->
<useRobots>true</useRobots>
<useRobots>false</useRobots>
<!-- (-1) is Unlimiited pages visited-->
<maxSeedPages>-1</maxSeedPages>
<maxMainPages>1000</maxMainPages>

View file

@ -10,3 +10,4 @@ Require-Bundle: com.raytheon.edex.common;bundle-version="1.11.26",
org.apache.commons.lang;bundle-version="2.3.0",
com.raytheon.uf.common.status;bundle-version="1.12.1174",
javax.persistence;bundle-version="1.0.0"
Export-Package: com.raytheon.uf.edex.purgesrv

View file

@ -0,0 +1,14 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<bean id="purgeManager" class="com.raytheon.uf.edex.purgesrv.PurgeManager">
<property name="clusterLimit" value="${purge.clusterlimit}"/>
<property name="serverLimit" value="${purge.serverlimit}"/>
<property name="deadPurgeJobAge" value="${purge.deadjobage}"/>
<property name="purgeFrequency" value="${purge.frequency}"/>
<property name="fatalFailureCount" value="${purge.fatalfailurecount}"/>
<property name="purgeEnabled" value="${purge.enabled}"/>
</bean>
</beans>

View file

@ -5,16 +5,10 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="purge" class="com.raytheon.uf.edex.purgesrv.PurgeSrv" />
<bean id="purgeManager" class="com.raytheon.uf.edex.purgesrv.PurgeManager" factory-method="getInstance">
<property name="clusterLimit" value="${purge.clusterlimit}"/>
<property name="serverLimit" value="${purge.serverlimit}"/>
<property name="deadPurgeJobAge" value="${purge.deadjobage}"/>
<property name="purgeFrequency" value="${purge.frequency}"/>
<property name="fatalFailureCount" value="${purge.fatalfailurecount}"/>
<property name="purgeEnabled" value="${purge.enabled}"/>
<bean id="purge" class="com.raytheon.uf.edex.purgesrv.PurgeSrv">
<constructor-arg ref = "purgeManager"/>
</bean>
<!-- Register database plugin -->
<bean id="purgeDatabaseProperties" class="com.raytheon.uf.edex.database.DatabasePluginProperties">
<property name="pluginFQN" value="com.raytheon.uf.edex.purgesrv" />

View file

@ -73,6 +73,8 @@ public class PurgeJob extends Thread {
/** Last time job has printed a timed out message */
private long lastTimeOutMessage = 0;
private PurgeManager purgeManager;
/**
* Creates a new Purge job for the specified plugin.
@ -82,11 +84,12 @@ public class PurgeJob extends Thread {
* @param purgeType
* The type of purge to be executed
*/
public PurgeJob(String pluginName, PURGE_JOB_TYPE purgeType) {
public PurgeJob(String pluginName, PURGE_JOB_TYPE purgeType, PurgeManager purgeManager) {
// Give the thread a name
this.setName("Purge-" + pluginName.toUpperCase() + "-Thread");
this.pluginName = pluginName;
this.purgeType = purgeType;
this.purgeManager = purgeManager;
}
public void run() {
@ -143,7 +146,7 @@ public class PurgeJob extends Thread {
t = t.getCause();
}
} finally {
ClusterTask purgeLock = PurgeManager.getInstance().getPurgeLock();
ClusterTask purgeLock = purgeManager.getPurgeLock();
try {
/*
* Update the status accordingly if the purge failed or
@ -159,13 +162,11 @@ public class PurgeJob extends Thread {
} else {
if (failed) {
status.incrementFailedCount();
if (status.getFailedCount() >= PurgeManager
.getInstance().getFatalFailureCount()) {
if (status.getFailedCount() >= purgeManager.getFatalFailureCount()) {
PurgeLogger
.logFatal(
"Purger for this plugin has reached or exceeded consecutive failure limit of "
+ PurgeManager
.getInstance()
+ purgeManager
.getFatalFailureCount()
+ ". Data will no longer being purged for this plugin.",
pluginName);
@ -188,7 +189,7 @@ public class PurgeJob extends Thread {
* This purger thread has exceeded the time out duration but
* finally finished. Output a message and update the status
*/
int deadPurgeJobAge = PurgeManager.getInstance()
int deadPurgeJobAge = purgeManager
.getDeadPurgeJobAge();
Calendar purgeTimeOutLimit = Calendar.getInstance();
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));

View file

@ -83,6 +83,7 @@ import com.raytheon.uf.edex.purgesrv.PurgeJob.PURGE_JOB_TYPE;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 18, 2012 #470 bphillip Initial creation
* Apr 11, 2013 #1959 dhladky Added method that only processes running plugins
*
* </pre>
*
@ -141,163 +142,176 @@ public class PurgeManager {
private PurgeDao dao = new PurgeDao();
private static PurgeManager instance = new PurgeManager();
public static PurgeManager getInstance() {
return instance;
}
/**
* Creates a new PurgeManager
*/
private PurgeManager() {
protected PurgeManager() {
}
/**
* Executes the purge routine
*/
public void executePurge() {
if (!purgeEnabled) {
PurgeLogger.logWarn(
"Data purging has been disabled. No data will be purged.",
null);
return;
}
/**
* Executes the purge routine
*/
public void executePurge() {
ClusterTask purgeMgrTask = getPurgeLock();
// Gets the list of plugins in ascending order by the last time they
// were purged
List<String> pluginList = dao.getPluginsByPurgeTime();
try {
// Prune the job map
Iterator<PurgeJob> iter = purgeJobs.values().iterator();
while (iter.hasNext()) {
if (!iter.next().isAlive()) {
iter.remove();
}
}
// check for any new plugins or database being purged and needing
// entries recreated
Set<String> availablePlugins = new HashSet<String>(PluginRegistry
.getInstance().getRegisteredObjects());
Calendar purgeTimeOutLimit = Calendar.getInstance();
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
Calendar purgeFrequencyLimit = Calendar.getInstance();
purgeFrequencyLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
purgeFrequencyLimit.add(Calendar.MINUTE, -purgeFrequency);
// Merge the lists
availablePlugins.removeAll(pluginList);
// Gets the list of plugins in ascending order by the last time they
// were purged
List<String> pluginList = dao.getPluginsByPurgeTime();
if (availablePlugins.size() > 0) {
// generate new list with them at the beginning
List<String> newSortedPlugins = new ArrayList<String>(
availablePlugins);
Collections.sort(newSortedPlugins);
newSortedPlugins.addAll(pluginList);
pluginList = newSortedPlugins;
}
// check for any new plugins or database being purged and needing
// entries recreated
Set<String> availablePlugins = new HashSet<String>(PluginRegistry
.getInstance().getRegisteredObjects());
purgeRunner(pluginList);
}
// Merge the lists
availablePlugins.removeAll(pluginList);
/**
* The guts of the actual purge process
* @param availablePlugins
*/
protected void purgeRunner(List<String> pluginList) {
if (availablePlugins.size() > 0) {
// generate new list with them at the beginning
List<String> newSortedPlugins = new ArrayList<String>(
availablePlugins);
Collections.sort(newSortedPlugins);
newSortedPlugins.addAll(pluginList);
pluginList = newSortedPlugins;
}
if (!purgeEnabled) {
PurgeLogger.logWarn(
"Data purging has been disabled. No data will be purged.",
null);
return;
}
ClusterTask purgeMgrTask = getPurgeLock();
try {
// Prune the job map
Iterator<PurgeJob> iter = purgeJobs.values().iterator();
while (iter.hasNext()) {
if (!iter.next().isAlive()) {
iter.remove();
}
}
Calendar purgeTimeOutLimit = Calendar.getInstance();
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
Calendar purgeFrequencyLimit = Calendar.getInstance();
purgeFrequencyLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
purgeFrequencyLimit.add(Calendar.MINUTE, -purgeFrequency);
boolean canPurge = true;
int jobsStarted = 0;
int maxNumberOfJobsToStart = Math.min(
clusterLimit
- dao.getRunningClusterJobs(
purgeTimeOutLimit.getTime(),
fatalFailureCount), serverLimit
- getNumberRunningJobsOnServer(purgeTimeOutLimit));
for (String plugin : pluginList) {
try {
// initialize canPurge based on number of jobs started
canPurge = jobsStarted < maxNumberOfJobsToStart;
PurgeJob jobThread = purgeJobs.get(plugin);
PurgeJobStatus job = dao.getJobForPlugin(plugin);
boolean canPurge = true;
int jobsStarted = 0;
int maxNumberOfJobsToStart = Math.min(
clusterLimit
- dao.getRunningClusterJobs(
purgeTimeOutLimit.getTime(),
fatalFailureCount), serverLimit
- getNumberRunningJobsOnServer(purgeTimeOutLimit));
if (!pluginList.isEmpty()) {
for (String plugin : pluginList) {
try {
// initialize canPurge based on number of jobs started
canPurge = jobsStarted < maxNumberOfJobsToStart;
PurgeJob jobThread = purgeJobs.get(plugin);
PurgeJobStatus job = dao.getJobForPlugin(plugin);
if (job == null) {
// no job in database, generate empty job
if (job == null) {
// no job in database, generate empty job
try {
job = new PurgeJobStatus();
job.setPlugin(plugin);
job.setFailedCount(0);
job.setRunning(false);
job.setStartTime(new Date(0));
dao.create(job);
} catch (Throwable e) {
PurgeLogger.logError(
"Failed to create new purge job entry",
plugin, e);
}
}
try {
job = new PurgeJobStatus();
job.setPlugin(plugin);
job.setFailedCount(0);
job.setRunning(false);
job.setStartTime(new Date(0));
dao.create(job);
} catch (Throwable e) {
PurgeLogger.logError(
"Failed to create new purge job entry",
plugin, e);
}
}
// Check to see if this job has met the fatal failure count
if (job.getFailedCount() >= fatalFailureCount) {
canPurge = false;
PurgeLogger
.logFatal(
"Purger for this plugin has reached or exceeded consecutive failure limit of "
+ fatalFailureCount
+ ". Data will no longer being purged for this plugin.",
plugin);
}
// Check to see if this job has met the fatal failure
// count
if (job.getFailedCount() >= fatalFailureCount) {
canPurge = false;
PurgeLogger
.logFatal(
"Purger for this plugin has reached or exceeded consecutive failure limit of "
+ fatalFailureCount
+ ". Data will no longer being purged for this plugin.",
plugin);
}
// is purge job currently running on this server
if (jobThread != null) {
// job currently running on our server, don't start
// another
canPurge = false;
// is purge job currently running on this server
if (jobThread != null) {
// job currently running on our server, don't start
// another
canPurge = false;
if (purgeTimeOutLimit.getTimeInMillis() > jobThread
.getStartTime()) {
jobThread.printTimedOutMessage(deadPurgeJobAge);
}
} else {
if (job.isRunning()) {
// check if job has timed out
if (purgeTimeOutLimit.getTime().before(
job.getStartTime())) {
canPurge = false;
}
// else if no one else sets canPurge = false will
// start purging on this server
} else {
// not currently running, check if need to be purged
Date startTime = job.getStartTime();
if (startTime != null
&& startTime.after(purgeFrequencyLimit
.getTime())) {
canPurge = false;
}
}
}
if (purgeTimeOutLimit.getTimeInMillis() > jobThread
.getStartTime()) {
jobThread.printTimedOutMessage(deadPurgeJobAge);
}
} else {
if (job.isRunning()) {
// check if job has timed out
if (purgeTimeOutLimit.getTime().before(
job.getStartTime())) {
canPurge = false;
}
// else if no one else sets canPurge = false
// will
// start purging on this server
} else {
// not currently running, check if need to be
// purged
Date startTime = job.getStartTime();
if (startTime != null
&& startTime.after(purgeFrequencyLimit
.getTime())) {
canPurge = false;
}
}
}
if (canPurge) {
purgeJobs.put(plugin, purgeExpiredData(plugin));
jobsStarted++;
}
} catch (Throwable e) {
PurgeLogger
.logError(
"An unexpected error occured during the purge job check for plugin",
plugin, e);
}
}
} catch (Throwable e) {
PurgeLogger
.logError(
"An unexpected error occured during the data purge process",
StatusConstants.CATEGORY_PURGE, e);
} finally {
// Unlock the purge task to allow other servers to run.
ClusterLockUtils.unlock(purgeMgrTask, false);
// PurgeLogger.logInfo(getPurgeStatus(true), null);
}
}
if (canPurge) {
purgeJobs.put(plugin, purgeExpiredData(plugin));
jobsStarted++;
}
} catch (Throwable e) {
PurgeLogger
.logError(
"An unexpected error occured during the purge job check for plugin",
plugin, e);
}
}
}
} catch (Throwable e) {
PurgeLogger
.logError(
"An unexpected error occured during the data purge process",
StatusConstants.CATEGORY_PURGE, e);
} finally {
// Unlock the purge task to allow other servers to run.
ClusterLockUtils.unlock(purgeMgrTask, false);
// PurgeLogger.logInfo(getPurgeStatus(true), null);
}
}
@SuppressWarnings("unused")
private String getPurgeStatus(boolean verbose) {
@ -417,7 +431,7 @@ public class PurgeManager {
*/
public PurgeJob purgeExpiredData(String plugin) {
dao.startJob(plugin);
PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_EXPIRED);
PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_EXPIRED, this);
job.start();
return job;
}
@ -433,7 +447,7 @@ public class PurgeManager {
*/
public PurgeJob purgeAllData(String plugin) {
dao.startJob(plugin);
PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_ALL);
PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_ALL, this);
job.start();
return job;
}

View file

@ -73,12 +73,15 @@ public class PurgeSrv {
/** The purge cron message */
public static final String PURGE_CRON = "PURGE_CRON";
private PurgeManager purgeManager;
/**
* Constructs a new PurgeSrv. This method verifies the metadata database has
* been constructed and exports the schema if necessary
*/
public PurgeSrv() {
public PurgeSrv(PurgeManager purgeManager) {
this.purgeManager = purgeManager;
}
public void purgeCron() throws Exception {
@ -126,10 +129,10 @@ public class PurgeSrv {
purgeAllData();
} else if (message.startsWith(DELETE_PLUGIN_DATA)) {
String pluginToPurge = message.replace(DELETE_PLUGIN_DATA, "");
PurgeManager.getInstance().purgeExpiredData(pluginToPurge);
purgeManager.purgeExpiredData(pluginToPurge);
} else if (message.startsWith(DELETE_ALL_PLUGIN_DATA)) {
String pluginToPurge = message.replace(DELETE_ALL_PLUGIN_DATA, "");
PurgeManager.getInstance().purgeAllData(pluginToPurge);
purgeManager.purgeAllData(pluginToPurge);
} else if (message.equals(PURGE_CRON)
|| message.equals(DELETE_EXPIRED_DATA)) {
purgeExpiredData();
@ -160,7 +163,7 @@ public class PurgeSrv {
.getInstance().getRegisteredObjects());
for (String pluginName : availablePlugins) {
if (PluginRegistry.getInstance().getRegisteredObject(pluginName) != null) {
PurgeManager.getInstance().purgeAllData(pluginName);
purgeManager.purgeAllData(pluginName);
}
}
PurgeLogger.logInfo("Purge All Data Completed at: " + new Date(),
@ -183,7 +186,7 @@ public class PurgeSrv {
for (String pluginName : availablePlugins) {
if (PluginRegistry.getInstance().getRegisteredObject(pluginName) != null) {
PurgeManager.getInstance().purgeExpiredData(pluginName);
purgeManager.purgeExpiredData(pluginName);
}
}
}