12.5.1-15 baseline
Former-commit-id: 1c128372ef9f478f1b807e8d0f23207d095bfc65
This commit is contained in:
parent
8f3c9ebfb7
commit
d5c238de07
19 changed files with 2030 additions and 1904 deletions
0
RadarServer/com.raytheon.rcm.lib/src/com/raytheon/rcm/config/awips1/Awips1RpsListUtil.java
Executable file → Normal file
0
RadarServer/com.raytheon.rcm.lib/src/com/raytheon/rcm/config/awips1/Awips1RpsListUtil.java
Executable file → Normal file
0
RadarServer/com.raytheon.rcm.server/src/com/raytheon/rcm/config/awips1/Awips1ConfigProvider.java
Executable file → Normal file
0
RadarServer/com.raytheon.rcm.server/src/com/raytheon/rcm/config/awips1/Awips1ConfigProvider.java
Executable file → Normal file
72
TextDao.java
Normal file
72
TextDao.java
Normal file
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.edex.plugin.text.dao;
|
||||
|
||||
import java.util.Calendar;
|
||||
|
||||
import com.raytheon.edex.db.dao.DefaultPluginDao;
|
||||
import com.raytheon.edex.textdb.dbapi.impl.TextDB;
|
||||
import com.raytheon.uf.common.dataplugin.PluginException;
|
||||
import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
||||
|
||||
/**
|
||||
* DAO for text products
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jul 10, 2009 2191 rjpeter Update retention time handling.
|
||||
* Aug 18, 2009 2191 rjpeter Changed to version purging.
|
||||
* </pre>
|
||||
*
|
||||
* @author
|
||||
* @version 1
|
||||
*/
|
||||
public class TextDao extends DefaultPluginDao {
|
||||
|
||||
public TextDao(String pluginName) throws PluginException {
|
||||
super(pluginName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void purgeAllData() {
|
||||
logger.warn("purgeAllPluginData not implemented for text. No data will be purged.");
|
||||
}
|
||||
|
||||
protected void loadScripts() throws PluginException {
|
||||
// no op
|
||||
}
|
||||
|
||||
public void purgeExpiredData() throws PluginException {
|
||||
int deletedRecords = 0;
|
||||
|
||||
// only do full purge every few hours since incremental purge runs every
|
||||
// minute
|
||||
if (Calendar.getInstance().get(Calendar.HOUR_OF_DAY) % 3 == 0) {
|
||||
TextDB.purgeStdTextProducts();
|
||||
}
|
||||
|
||||
PurgeLogger.logInfo("Purged " + deletedRecords + " items total.",
|
||||
"text");
|
||||
}
|
||||
}
|
0
cave/build/static/linux/cave/caveEnvironment/lib/libgempak.so
Executable file → Normal file
0
cave/build/static/linux/cave/caveEnvironment/lib/libgempak.so
Executable file → Normal file
|
@ -21,6 +21,7 @@ package com.raytheon.uf.viz.alertviz;
|
|||
|
||||
import java.io.PrintStream;
|
||||
import java.io.StringWriter;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import javax.jms.ExceptionListener;
|
||||
|
@ -88,7 +89,11 @@ public class AlertVizClient implements MessageListener {
|
|||
|
||||
private CopyOnWriteArrayList<IAlertVizMessageCallback> listeners;
|
||||
|
||||
private Marshaller marshaller;
|
||||
private static int POOL_SIZE = 5;
|
||||
|
||||
private java.util.Queue<Marshaller> marshallers = new ConcurrentLinkedQueue<Marshaller>();
|
||||
|
||||
private JAXBContext jaxbContext;
|
||||
|
||||
private static AlertVizClient instance;
|
||||
|
||||
|
@ -134,8 +139,7 @@ public class AlertVizClient implements MessageListener {
|
|||
this.consumer.setMessageListener(this);
|
||||
reconnect = false;
|
||||
lastReconnectTime = System.currentTimeMillis();
|
||||
JAXBContext context = JAXBContext.newInstance(StatusMessage.class);
|
||||
marshaller = context.createMarshaller();
|
||||
jaxbContext = JAXBContext.newInstance(StatusMessage.class);
|
||||
} catch (JMSException e) {
|
||||
reconnect = true;
|
||||
throw new AlertvizException("Unable to connect to notification", e);
|
||||
|
@ -158,8 +162,11 @@ public class AlertVizClient implements MessageListener {
|
|||
if (retryOnExceptions == false && reconnect == true) {
|
||||
printToConsole(statusMessage);
|
||||
} else {
|
||||
Marshaller marshaller = null;
|
||||
|
||||
try {
|
||||
StringWriter sw = new StringWriter();
|
||||
marshaller = getMarshaller();
|
||||
marshaller.marshal(statusMessage, sw);
|
||||
ActiveMQTextMessage message = new ActiveMQTextMessage();
|
||||
message.setText(sw.toString());
|
||||
|
@ -178,9 +185,21 @@ public class AlertVizClient implements MessageListener {
|
|||
} catch (Exception e) {
|
||||
throw new AlertvizException("Error sending message", e);
|
||||
}
|
||||
|
||||
if (marshaller != null && marshallers.size() < POOL_SIZE) {
|
||||
marshallers.add(marshaller);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Marshaller getMarshaller() throws JAXBException {
|
||||
Marshaller m = marshallers.poll();
|
||||
if (m == null) {
|
||||
m = jaxbContext.createMarshaller();
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param statusMessage
|
||||
*/
|
||||
|
|
|
@ -188,38 +188,41 @@ public class Container implements IConfigurationChangedListener {
|
|||
private boolean isShotGun(StatusMessage message) {
|
||||
boolean retVal = false;
|
||||
if (lastMessage != null) {
|
||||
final long shotgunMessageCheckTime = this.shotgunMessageStartTime == 0 ? this.lastMessage
|
||||
.getEventTime().getTime() : this.shotgunMessageStartTime;
|
||||
|
||||
if (this.lastMessage.getCategory().equals(message.getCategory())
|
||||
&& this.lastMessage.getPriority() == message.getPriority()
|
||||
&& this.lastMessage.getMessage().equals(
|
||||
message.getMessage())
|
||||
&& Math.abs(this.lastMessage.getEventTime().getTime()
|
||||
- message.getEventTime().getTime()) < SHOTGUN_MESSAGE_MILLISECOND_THRESHOLD) {
|
||||
&& (Math.abs(message.getEventTime().getTime()
|
||||
- shotgunMessageCheckTime) < SHOTGUN_MESSAGE_MILLISECOND_THRESHOLD)) {
|
||||
retVal = true;
|
||||
++this.shotgunMessageCount;
|
||||
if (this.shotgunMessageStartTime == 0) {
|
||||
this.shotgunMessageStartTime = message.getEventTime()
|
||||
this.shotgunMessageStartTime = lastMessage.getEventTime()
|
||||
.getTime();
|
||||
}
|
||||
} else {
|
||||
if (this.shotgunMessageCount != 0) {
|
||||
if (this.shotgunMessageCount > 1) {
|
||||
StringBuilder sb = new StringBuilder("Received ")
|
||||
.append(this.shotgunMessageCount)
|
||||
.append(" duplicate messages in ")
|
||||
.append(message.getEventTime().getTime()
|
||||
.append(this.lastMessage.getEventTime().getTime()
|
||||
- this.shotgunMessageStartTime)
|
||||
.append(" milliseconds. For message: ")
|
||||
.append(this.lastMessage.getCategory()).append(":")
|
||||
.append(this.lastMessage.getSourceKey())
|
||||
.append(" ").append(this.lastMessage.getMessage());
|
||||
this.shotgunMessageStartTime = 0;
|
||||
this.shotgunMessageCount = 0;
|
||||
StatusMessage sm = new StatusMessage(
|
||||
this.lastMessage.getSourceKey(), "GDN_ADMIN",
|
||||
this.lastMessage.getPriority(),
|
||||
this.lastMessage.getPlugin(), sb.toString(), null);
|
||||
sm.setEventTime(SimulatedTime.getSystemTime().getTime());
|
||||
messageReceived(sm);
|
||||
logInternal(sm);
|
||||
}
|
||||
this.shotgunMessageStartTime = 0;
|
||||
this.shotgunMessageCount = 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
0
cave/com.raytheon.viz.aviation/src/com/raytheon/viz/aviation/monitor/TafSiteComp.java
Executable file → Normal file
0
cave/com.raytheon.viz.aviation/src/com/raytheon/viz/aviation/monitor/TafSiteComp.java
Executable file → Normal file
|
@ -86,6 +86,7 @@ import com.raytheon.viz.ui.dialogs.CaveSWTDialog;
|
|||
* 06/28/2010 4639 cjeanbap Allow user to create a new text product.
|
||||
*
|
||||
* 01/26/2012 14468 D.Friedman Fix initial BBB field selection.
|
||||
* 05/30/2012 15046 D.Friedman Always set addressee field to ALL.
|
||||
* </pre>
|
||||
*
|
||||
* @author lvenable
|
||||
|
@ -448,7 +449,7 @@ public class AWIPSHeaderBlockDlg extends CaveSWTDialog implements
|
|||
addresseeTF.setTextLimit(4);
|
||||
addresseeTF.setLayoutData(rd);
|
||||
// Set the "default" addressee to "ALL".
|
||||
addresseeTF.setText(parentEditor.getAddressee());
|
||||
addresseeTF.setText("ALL");
|
||||
|
||||
// When the number of characters enter reaches the max limit and
|
||||
// the caret position is at the end then switch focus to the next
|
||||
|
@ -459,16 +460,8 @@ public class AWIPSHeaderBlockDlg extends CaveSWTDialog implements
|
|||
.getTextLimit()) {
|
||||
wmoTtaaiiTF.setFocus();
|
||||
}
|
||||
|
||||
// If the user changes the text in the addressee text field
|
||||
// then "untoggle" the toggle buttons.
|
||||
if (addresseeTF.getText().compareTo("000") != 0
|
||||
&& addresseeTF.getText().compareTo("DEF") != 0
|
||||
&& addresseeTF.getText().compareTo("ALL") != 0) {
|
||||
zerosBtn.setSelection(false);
|
||||
defBtn.setSelection(false);
|
||||
allBtn.setSelection(false);
|
||||
}
|
||||
|
||||
handleAddresseeModified();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -518,6 +511,7 @@ public class AWIPSHeaderBlockDlg extends CaveSWTDialog implements
|
|||
addresseeTF.setText("ALL");
|
||||
}
|
||||
});
|
||||
handleAddresseeModified();
|
||||
|
||||
Label sepLbl = new Label(shell, SWT.SEPARATOR | SWT.HORIZONTAL);
|
||||
sepLbl.setLayoutData(new GridData(GridData.FILL_HORIZONTAL));
|
||||
|
@ -945,4 +939,16 @@ public class AWIPSHeaderBlockDlg extends CaveSWTDialog implements
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void handleAddresseeModified() {
|
||||
// If the user changes the text in the addressee text field
|
||||
// then update the toggle buttons.
|
||||
String addressee = addresseeTF.getText();
|
||||
if (zerosBtn != null)
|
||||
zerosBtn.setSelection("000".equals(addressee));
|
||||
if (defBtn != null)
|
||||
defBtn.setSelection("DEF".equals(addressee));
|
||||
if (allBtn != null)
|
||||
allBtn.setSelection("ALL".equals(addressee));
|
||||
}
|
||||
}
|
||||
|
|
0
cave/com.raytheon.viz.texteditor/src/com/raytheon/viz/texteditor/dialogs/TextEditorDialog.java
Normal file → Executable file
0
cave/com.raytheon.viz.texteditor/src/com/raytheon/viz/texteditor/dialogs/TextEditorDialog.java
Normal file → Executable file
|
@ -1734,13 +1734,19 @@ public class WarngenLayer extends AbstractStormTrackResource {
|
|||
Matcher m = tmlPtrn.matcher(rawMessage);
|
||||
|
||||
if (m.find()) {
|
||||
int day = warnRecord.getIssueTime().get(Calendar.DAY_OF_MONTH);
|
||||
int hour = Integer.parseInt(m.group(1));
|
||||
int minute = Integer.parseInt(m.group(2));
|
||||
Calendar issueTime = warnRecord.getIssueTime();
|
||||
int day = issueTime.get(Calendar.DAY_OF_MONTH);
|
||||
int tmlHour = Integer.parseInt(m.group(1));
|
||||
// This is for the case when the warning text was created,
|
||||
// but actually issued the next day.
|
||||
if (tmlHour > issueTime.get(Calendar.HOUR_OF_DAY)) {
|
||||
day--;
|
||||
}
|
||||
int tmlMinute = Integer.parseInt(m.group(2));
|
||||
frameTime = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
|
||||
frameTime.set(Calendar.DAY_OF_MONTH, day);
|
||||
frameTime.set(Calendar.HOUR_OF_DAY, hour);
|
||||
frameTime.set(Calendar.MINUTE, minute);
|
||||
frameTime.set(Calendar.HOUR_OF_DAY, tmlHour);
|
||||
frameTime.set(Calendar.MINUTE, tmlMinute);
|
||||
} else {
|
||||
frameTime = warnRecord.getIssueTime();
|
||||
}
|
||||
|
|
0
deltaScripts/12.5.1/drop_gfe_tables.sh
Normal file → Executable file
0
deltaScripts/12.5.1/drop_gfe_tables.sh
Normal file → Executable file
2
edexOsgi/build.edex/esb/data/utility/common_static/base/warngen/specialMarineWarningFollowup.vm
Normal file → Executable file
2
edexOsgi/build.edex/esb/data/utility/common_static/base/warngen/specialMarineWarningFollowup.vm
Normal file → Executable file
|
@ -43,6 +43,7 @@
|
|||
#if(${action}!="CANCON")
|
||||
${WMOId} ${vtecOffice} 000000 ${BBBId}
|
||||
MWS${siteId}
|
||||
|
||||
#if(${productClass}=="T")
|
||||
TEST...MARINE WEATHER STATEMENT...TEST
|
||||
#else
|
||||
|
@ -478,6 +479,7 @@ REPORT SEVERE WEATHER TO THE COAST GUARD OR NEAREST LAW ENFORCEMENT AGENCY. THEY
|
|||
#if(${action}=="CANCON")
|
||||
${WMOId} ${vtecOffice} 000000 ${BBBId}
|
||||
MWS${siteId}
|
||||
|
||||
#if(${productClass}=="T")
|
||||
TEST...MARINE WEATHER STATEMENT...TEST
|
||||
#else
|
||||
|
|
|
@ -450,7 +450,7 @@ public class LockManager {
|
|||
for (int i = 0; i < locks.size() - 1; i++) {
|
||||
currentLock = locks.get(i);
|
||||
nextLock = locks.get(i + 1);
|
||||
if (currentLock.getEndTime() >= nextLock.getStartTime()) {
|
||||
if (currentLock.getEndTime() >= nextLock.getStartTime() && currentLock.getWsId().equals(nextLock.getWsId())) {
|
||||
lockCombined = true;
|
||||
deleted.add(currentLock);
|
||||
deleted.add(nextLock);
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -1,63 +1,72 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.edex.plugin.text.dao;
|
||||
|
||||
import com.raytheon.edex.db.dao.DefaultPluginDao;
|
||||
import com.raytheon.edex.textdb.dbapi.impl.TextDB;
|
||||
import com.raytheon.uf.common.dataplugin.PluginException;
|
||||
import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
||||
|
||||
/**
|
||||
* DAO for text products
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jul 10, 2009 2191 rjpeter Update retention time handling.
|
||||
* Aug 18, 2009 2191 rjpeter Changed to version purging.
|
||||
* </pre>
|
||||
*
|
||||
* @author
|
||||
* @version 1
|
||||
*/
|
||||
public class TextDao extends DefaultPluginDao {
|
||||
|
||||
public TextDao(String pluginName) throws PluginException {
|
||||
super(pluginName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void purgeAllData() {
|
||||
logger.warn("purgeAllPluginData not implemented for text. No data will be purged.");
|
||||
}
|
||||
|
||||
protected void loadScripts() throws PluginException {
|
||||
// no op
|
||||
}
|
||||
|
||||
public void purgeExpiredData() throws PluginException {
|
||||
int deletedRecords = TextDB.purgeStdTextProducts();
|
||||
PurgeLogger.logInfo("Purged " + deletedRecords + " items total.",
|
||||
"text");
|
||||
}
|
||||
}
|
||||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.edex.plugin.text.dao;
|
||||
|
||||
import java.util.Calendar;
|
||||
|
||||
import com.raytheon.edex.db.dao.DefaultPluginDao;
|
||||
import com.raytheon.edex.textdb.dbapi.impl.TextDB;
|
||||
import com.raytheon.uf.common.dataplugin.PluginException;
|
||||
import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
||||
|
||||
/**
|
||||
* DAO for text products
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jul 10, 2009 2191 rjpeter Update retention time handling.
|
||||
* Aug 18, 2009 2191 rjpeter Changed to version purging.
|
||||
* </pre>
|
||||
*
|
||||
* @author
|
||||
* @version 1
|
||||
*/
|
||||
public class TextDao extends DefaultPluginDao {
|
||||
|
||||
public TextDao(String pluginName) throws PluginException {
|
||||
super(pluginName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void purgeAllData() {
|
||||
logger.warn("purgeAllPluginData not implemented for text. No data will be purged.");
|
||||
}
|
||||
|
||||
protected void loadScripts() throws PluginException {
|
||||
// no op
|
||||
}
|
||||
|
||||
public void purgeExpiredData() throws PluginException {
|
||||
int deletedRecords = 0;
|
||||
|
||||
// only do full purge every few hours since incremental purge runs every
|
||||
// minute
|
||||
if (Calendar.getInstance().get(Calendar.HOUR_OF_DAY) % 3 == 0) {
|
||||
TextDB.purgeStdTextProducts();
|
||||
}
|
||||
|
||||
PurgeLogger.logInfo("Purged " + deletedRecords + " items total.",
|
||||
"text");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,284 +1,282 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.purgesrv;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import org.hibernate.Query;
|
||||
import org.hibernate.Session;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
|
||||
import com.raytheon.uf.edex.database.dao.CoreDao;
|
||||
import com.raytheon.uf.edex.database.dao.DaoConfig;
|
||||
|
||||
/**
|
||||
*
|
||||
* Data access object for accessing purge job status objects
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* May 1, 2012 #470 bphillip Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1.0
|
||||
*/
|
||||
public class PurgeDao extends CoreDao {
|
||||
|
||||
/**
|
||||
* Constructs a new purge data access object
|
||||
*/
|
||||
public PurgeDao() {
|
||||
super(DaoConfig.forClass(PurgeJobStatus.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of purge jobs currently running on the cluster. A job is
|
||||
* considered running if the 'running' flag is set to true and the job has
|
||||
* been started since validStartTime and has not met or exceeded the failed
|
||||
* count.
|
||||
*
|
||||
* @param validStartTime
|
||||
* @param failedCount
|
||||
* @return The number of purge jobs currently running on the cluster
|
||||
*/
|
||||
public int getRunningClusterJobs(final Date validStartTime,
|
||||
final int failedCount) {
|
||||
final String query = "from "
|
||||
+ daoClass.getName()
|
||||
+ " obj where obj.running = true and obj.startTime > :startTime and obj.failedCount <= :failedCount";
|
||||
return (Integer) txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public Object doInTransaction(TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
hibQuery.setTimestamp("startTime", validStartTime);
|
||||
hibQuery.setInteger("failedCount", failedCount);
|
||||
List<?> queryResult = hibQuery.list();
|
||||
if (queryResult == null) {
|
||||
return 0;
|
||||
} else {
|
||||
return queryResult.size();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the jobs that have met or exceed the failed count.
|
||||
*
|
||||
* @param failedCount
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<PurgeJobStatus> getFailedJobs(final int failedCount) {
|
||||
final String query = "from " + daoClass.getName()
|
||||
+ " obj where obj.failedCount >= :failedCount";
|
||||
return (List<PurgeJobStatus>) txTemplate
|
||||
.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public List<PurgeJobStatus> doInTransaction(
|
||||
TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
hibQuery.setInteger("failedCount", failedCount);
|
||||
return hibQuery.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<PurgeJobStatus> getTimedOutJobs(final Date validStartTime) {
|
||||
final String query = "from "
|
||||
+ daoClass.getName()
|
||||
+ " obj where obj.running = true and obj.startTime <= :startTime";
|
||||
return (List<PurgeJobStatus>) txTemplate
|
||||
.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public List<PurgeJobStatus> doInTransaction(
|
||||
TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
hibQuery.setTimestamp("startTime", validStartTime);
|
||||
return hibQuery.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Map<String, List<PurgeJobStatus>> getRunningServerJobs() {
|
||||
final String query = "from "
|
||||
+ daoClass.getName()
|
||||
+ " obj where obj.running = true and obj.timedOut = false and obj.failed = false and obj.id.server=':SERVER'";
|
||||
return (Map<String, List<PurgeJobStatus>>) txTemplate
|
||||
.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public Map<String, List<PurgeJobStatus>> doInTransaction(
|
||||
TransactionStatus status) {
|
||||
Map<String, List<PurgeJobStatus>> serverMap = new HashMap<String, List<PurgeJobStatus>>();
|
||||
Query serverQuery = getSession(false).createQuery(
|
||||
"select distinct obj.id.server from "
|
||||
+ daoClass.getName()
|
||||
+ " obj order by obj.id.server asc");
|
||||
List<String> result = serverQuery.list();
|
||||
for (String server : result) {
|
||||
Query query2 = getSession(false).createQuery(
|
||||
query.replace(":SERVER", server));
|
||||
serverMap.put(server, query2.list());
|
||||
}
|
||||
return serverMap;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the amount of time in milliseconds since the last purge of a given
|
||||
* plugin
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin name
|
||||
* @return Number of milliseconds since the purge job was run for the given
|
||||
* plugin
|
||||
*/
|
||||
public long getTimeSinceLastPurge(String plugin) {
|
||||
final String query = "select obj.startTime from " + daoClass.getName()
|
||||
+ " obj where obj.id.plugin='" + plugin + "'";
|
||||
return (Long) txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public Object doInTransaction(TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
Timestamp queryResult = (Timestamp) hibQuery.uniqueResult();
|
||||
if (queryResult == null) {
|
||||
return -1;
|
||||
} else {
|
||||
return System.currentTimeMillis() - queryResult.getTime();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the purge job status for a plugin
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin to get the purge job status for
|
||||
* @return The purge job statuses
|
||||
*/
|
||||
public PurgeJobStatus getJobForPlugin(String plugin) {
|
||||
final String query = "from " + daoClass.getName()
|
||||
+ " obj where obj.id.plugin='" + plugin + "'";
|
||||
return (PurgeJobStatus) txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public PurgeJobStatus doInTransaction(TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
PurgeJobStatus queryResult = (PurgeJobStatus) hibQuery
|
||||
.uniqueResult();
|
||||
return queryResult;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a purge job to running status and sets the startTime to current
|
||||
* time. If was previously running, will increment the failed count.
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin row to update
|
||||
*/
|
||||
public void startJob(final String plugin) {
|
||||
final String query = "from " + daoClass.getName()
|
||||
+ " obj where obj.id.plugin='" + plugin + "'";
|
||||
txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public PurgeJobStatus doInTransaction(TransactionStatus status) {
|
||||
Session sess = getSession(false);
|
||||
Query hibQuery = sess.createQuery(query);
|
||||
PurgeJobStatus queryResult = (PurgeJobStatus) hibQuery
|
||||
.uniqueResult();
|
||||
if (queryResult == null) {
|
||||
queryResult = new PurgeJobStatus();
|
||||
queryResult.setFailedCount(0);
|
||||
queryResult.setPlugin(plugin);
|
||||
queryResult.setRunning(false);
|
||||
sess.save(queryResult);
|
||||
}
|
||||
|
||||
// any changes to PurgeJobStatus will be commited at end of
|
||||
// transaction
|
||||
if (queryResult.isRunning()) {
|
||||
// query was previously running, update failed count
|
||||
queryResult.incrementFailedCount();
|
||||
}
|
||||
|
||||
queryResult.setStartTime(Calendar.getInstance(
|
||||
TimeZone.getTimeZone("GMT")).getTime());
|
||||
queryResult.setRunning(true);
|
||||
|
||||
return queryResult;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the plugins order by startTime.
|
||||
*
|
||||
* @param latestStartTime
|
||||
* @param failedCount
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<String> getPluginsByPurgeTime() {
|
||||
final String query = "select obj.id.plugin from " + daoClass.getName()
|
||||
+ " obj order by obj.startTime asc, obj.plugin asc";
|
||||
return (List<String>) txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public List<String> doInTransaction(TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
List<String> result = (List<String>) hibQuery.list();
|
||||
return result;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates a purge job status object
|
||||
*
|
||||
* @param jobStatus
|
||||
* The object to update
|
||||
*/
|
||||
public void update(final PurgeJobStatus jobStatus) {
|
||||
txTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
@Override
|
||||
public void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
getHibernateTemplate().update(jobStatus);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.purgesrv;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import org.hibernate.Query;
|
||||
import org.hibernate.Session;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
|
||||
import com.raytheon.uf.edex.database.dao.CoreDao;
|
||||
import com.raytheon.uf.edex.database.dao.DaoConfig;
|
||||
|
||||
/**
|
||||
*
|
||||
* Data access object for accessing purge job status objects
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* May 1, 2012 #470 bphillip Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1.0
|
||||
*/
|
||||
public class PurgeDao extends CoreDao {
|
||||
|
||||
/**
|
||||
* Constructs a new purge data access object
|
||||
*/
|
||||
public PurgeDao() {
|
||||
super(DaoConfig.forClass(PurgeJobStatus.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of purge jobs currently running on the cluster. A job is
|
||||
* considered running if the 'running' flag is set to true and the job has
|
||||
* been started since validStartTime and has not met or exceeded the failed
|
||||
* count.
|
||||
*
|
||||
* @param validStartTime
|
||||
* @param failedCount
|
||||
* @return The number of purge jobs currently running on the cluster
|
||||
*/
|
||||
public int getRunningClusterJobs(final Date validStartTime,
|
||||
final int failedCount) {
|
||||
final String query = "from "
|
||||
+ daoClass.getName()
|
||||
+ " obj where obj.running = true and obj.startTime > :startTime and obj.failedCount <= :failedCount";
|
||||
return (Integer) txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public Object doInTransaction(TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
hibQuery.setTimestamp("startTime", validStartTime);
|
||||
hibQuery.setInteger("failedCount", failedCount);
|
||||
List<?> queryResult = hibQuery.list();
|
||||
if (queryResult == null) {
|
||||
return 0;
|
||||
} else {
|
||||
return queryResult.size();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the jobs that have met or exceed the failed count.
|
||||
*
|
||||
* @param failedCount
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<PurgeJobStatus> getFailedJobs(final int failedCount) {
|
||||
final String query = "from " + daoClass.getName()
|
||||
+ " obj where obj.failedCount >= :failedCount";
|
||||
return (List<PurgeJobStatus>) txTemplate
|
||||
.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public List<PurgeJobStatus> doInTransaction(
|
||||
TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
hibQuery.setInteger("failedCount", failedCount);
|
||||
return hibQuery.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<PurgeJobStatus> getTimedOutJobs(final Date validStartTime) {
|
||||
final String query = "from "
|
||||
+ daoClass.getName()
|
||||
+ " obj where obj.running = true and obj.startTime <= :startTime";
|
||||
return (List<PurgeJobStatus>) txTemplate
|
||||
.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public List<PurgeJobStatus> doInTransaction(
|
||||
TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
hibQuery.setTimestamp("startTime", validStartTime);
|
||||
return hibQuery.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Map<String, List<PurgeJobStatus>> getRunningServerJobs() {
|
||||
final String query = "from "
|
||||
+ daoClass.getName()
|
||||
+ " obj where obj.running = true and obj.timedOut = false and obj.failed = false and obj.id.server=':SERVER'";
|
||||
return (Map<String, List<PurgeJobStatus>>) txTemplate
|
||||
.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public Map<String, List<PurgeJobStatus>> doInTransaction(
|
||||
TransactionStatus status) {
|
||||
Map<String, List<PurgeJobStatus>> serverMap = new HashMap<String, List<PurgeJobStatus>>();
|
||||
Query serverQuery = getSession(false).createQuery(
|
||||
"select distinct obj.id.server from "
|
||||
+ daoClass.getName()
|
||||
+ " obj order by obj.id.server asc");
|
||||
List<String> result = serverQuery.list();
|
||||
for (String server : result) {
|
||||
Query query2 = getSession(false).createQuery(
|
||||
query.replace(":SERVER", server));
|
||||
serverMap.put(server, query2.list());
|
||||
}
|
||||
return serverMap;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the amount of time in milliseconds since the last purge of a given
|
||||
* plugin
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin name
|
||||
* @return Number of milliseconds since the purge job was run for the given
|
||||
* plugin
|
||||
*/
|
||||
public long getTimeSinceLastPurge(String plugin) {
|
||||
final String query = "select obj.startTime from " + daoClass.getName()
|
||||
+ " obj where obj.id.plugin='" + plugin + "'";
|
||||
return (Long) txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public Object doInTransaction(TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
Timestamp queryResult = (Timestamp) hibQuery.uniqueResult();
|
||||
if (queryResult == null) {
|
||||
return -1;
|
||||
} else {
|
||||
return System.currentTimeMillis() - queryResult.getTime();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the purge job status for a plugin
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin to get the purge job status for
|
||||
* @return The purge job statuses
|
||||
*/
|
||||
public PurgeJobStatus getJobForPlugin(String plugin) {
|
||||
final String query = "from " + daoClass.getName()
|
||||
+ " obj where obj.id.plugin='" + plugin + "'";
|
||||
return (PurgeJobStatus) txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public PurgeJobStatus doInTransaction(TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
PurgeJobStatus queryResult = (PurgeJobStatus) hibQuery
|
||||
.uniqueResult();
|
||||
return queryResult;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a purge job to running status and sets the startTime to current
|
||||
* time. If was previously running, will increment the failed count.
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin row to update
|
||||
*/
|
||||
public void startJob(final String plugin) {
|
||||
final String query = "from " + daoClass.getName()
|
||||
+ " obj where obj.id.plugin='" + plugin + "'";
|
||||
txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public PurgeJobStatus doInTransaction(TransactionStatus status) {
|
||||
Session sess = getSession(false);
|
||||
Query hibQuery = sess.createQuery(query);
|
||||
PurgeJobStatus queryResult = (PurgeJobStatus) hibQuery
|
||||
.uniqueResult();
|
||||
if (queryResult == null) {
|
||||
queryResult = new PurgeJobStatus();
|
||||
queryResult.setFailedCount(0);
|
||||
queryResult.setPlugin(plugin);
|
||||
queryResult.setRunning(false);
|
||||
sess.save(queryResult);
|
||||
}
|
||||
|
||||
if (queryResult.isRunning()) {
|
||||
// query was previously running, update failed count
|
||||
queryResult.incrementFailedCount();
|
||||
}
|
||||
|
||||
queryResult.setStartTime(Calendar.getInstance(
|
||||
TimeZone.getTimeZone("GMT")).getTime());
|
||||
queryResult.setRunning(true);
|
||||
sess.update(queryResult);
|
||||
return queryResult;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the plugins order by startTime.
|
||||
*
|
||||
* @param latestStartTime
|
||||
* @param failedCount
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<String> getPluginsByPurgeTime() {
|
||||
final String query = "select obj.id.plugin from " + daoClass.getName()
|
||||
+ " obj order by obj.startTime asc, obj.plugin asc";
|
||||
return (List<String>) txTemplate.execute(new TransactionCallback() {
|
||||
@Override
|
||||
public List<String> doInTransaction(TransactionStatus status) {
|
||||
Query hibQuery = getSession(false).createQuery(query);
|
||||
List<String> result = (List<String>) hibQuery.list();
|
||||
return result;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates a purge job status object
|
||||
*
|
||||
* @param jobStatus
|
||||
* The object to update
|
||||
*/
|
||||
public void update(final PurgeJobStatus jobStatus) {
|
||||
txTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
@Override
|
||||
public void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
getHibernateTemplate().update(jobStatus);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,299 +1,302 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.purgesrv;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||
import com.raytheon.uf.edex.database.plugin.PluginDao;
|
||||
import com.raytheon.uf.edex.database.plugin.PluginFactory;
|
||||
import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
||||
|
||||
/**
|
||||
*
|
||||
* This class encapsulates the purge activity for a plugin into a cluster task.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Apr 19, 2012 #470 bphillip Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1.0
|
||||
*/
|
||||
public class PurgeJob extends Thread {
|
||||
|
||||
/** The type of purge */
|
||||
public enum PURGE_JOB_TYPE {
|
||||
PURGE_ALL, PURGE_EXPIRED
|
||||
}
|
||||
|
||||
private long startTime;
|
||||
|
||||
/** The cluster task name to use for purge jobs */
|
||||
public static final String TASK_NAME = "Purge Plugin Data";
|
||||
|
||||
/** The plugin associated with this purge job */
|
||||
private String pluginName;
|
||||
|
||||
/** The type of purge job being executed */
|
||||
private PURGE_JOB_TYPE purgeType;
|
||||
|
||||
/** Last time job has printed a timed out message */
|
||||
private long lastTimeOutMessage = 0;
|
||||
|
||||
/**
|
||||
* Creates a new Purge job for the specified plugin.
|
||||
*
|
||||
* @param pluginName
|
||||
* The plugin to be purged
|
||||
* @param purgeType
|
||||
* The type of purge to be executed
|
||||
*/
|
||||
public PurgeJob(String pluginName, PURGE_JOB_TYPE purgeType) {
|
||||
// Give the thread a name
|
||||
this.setName("Purge-" + pluginName.toUpperCase() + "-Thread");
|
||||
this.pluginName = pluginName;
|
||||
this.purgeType = purgeType;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
|
||||
// Flag used to track if this job has failed
|
||||
boolean failed = false;
|
||||
startTime = System.currentTimeMillis();
|
||||
PurgeLogger.logInfo("Purging expired data...", pluginName);
|
||||
PluginDao dao = null;
|
||||
|
||||
try {
|
||||
dao = PluginFactory.getInstance().getPluginDao(pluginName);
|
||||
if (dao.getDaoClass() != null) {
|
||||
dao.purgeExpiredData();
|
||||
PurgeLogger.logInfo("Data successfully Purged!", pluginName);
|
||||
} else {
|
||||
Method m = dao.getClass().getMethod("purgeExpiredData",
|
||||
new Class[] {});
|
||||
if (m != null) {
|
||||
if (m.getDeclaringClass().equals(PluginDao.class)) {
|
||||
PurgeLogger
|
||||
.logWarn(
|
||||
"Unable to purge data. This plugin does not specify a record class and does not implement a custom purger.",
|
||||
pluginName);
|
||||
} else {
|
||||
if (this.purgeType.equals(PURGE_JOB_TYPE.PURGE_EXPIRED)) {
|
||||
dao.purgeExpiredData();
|
||||
} else {
|
||||
dao.purgeAllData();
|
||||
}
|
||||
PurgeLogger.logInfo("Data successfully Purged!",
|
||||
pluginName);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
failed = true;
|
||||
// keep getting next exceptions with sql exceptions to ensure
|
||||
// we can see the underlying error
|
||||
PurgeLogger
|
||||
.logError("Error purging expired data!\n", pluginName, e);
|
||||
Throwable t = e.getCause();
|
||||
while (t != null) {
|
||||
if (t instanceof SQLException) {
|
||||
SQLException se = ((SQLException) t).getNextException();
|
||||
PurgeLogger.logError("Next exception:", pluginName, se);
|
||||
}
|
||||
t = t.getCause();
|
||||
}
|
||||
} finally {
|
||||
ClusterTask purgeLock = PurgeManager.getInstance().getPurgeLock();
|
||||
try {
|
||||
/*
|
||||
* Update the status accordingly if the purge failed or
|
||||
* succeeded
|
||||
*/
|
||||
PurgeDao purgeDao = new PurgeDao();
|
||||
PurgeJobStatus status = purgeDao
|
||||
.getJobForPlugin(this.pluginName);
|
||||
if (status == null) {
|
||||
PurgeLogger.logError(
|
||||
"Purge job completed but no status object found!",
|
||||
this.pluginName);
|
||||
} else {
|
||||
if (failed) {
|
||||
status.incrementFailedCount();
|
||||
if (status.getFailedCount() >= PurgeManager
|
||||
.getInstance().getFatalFailureCount()) {
|
||||
PurgeLogger
|
||||
.logFatal(
|
||||
"Purger for this plugin has reached or exceeded consecutive failure limit of "
|
||||
+ PurgeManager
|
||||
.getInstance()
|
||||
.getFatalFailureCount()
|
||||
+ ". Data will no longer being purged for this plugin.",
|
||||
pluginName);
|
||||
} else {
|
||||
PurgeLogger.logError("Purge job has failed "
|
||||
+ status.getFailedCount()
|
||||
+ " consecutive times.", this.pluginName);
|
||||
// Reset the start time so we can try again as soon
|
||||
// as possible
|
||||
status.setStartTime(new Date(0));
|
||||
}
|
||||
} else {
|
||||
status.setFailedCount(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* This purger thread has exceeded the time out duration but
|
||||
* finally finished. Output a message and update the status
|
||||
*/
|
||||
int deadPurgeJobAge = PurgeManager.getInstance()
|
||||
.getDeadPurgeJobAge();
|
||||
Calendar purgeTimeOutLimit = Calendar.getInstance();
|
||||
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
|
||||
if (startTime < purgeTimeOutLimit.getTimeInMillis()) {
|
||||
PurgeLogger
|
||||
.logInfo(
|
||||
"Purge job has recovered from timed out state!!",
|
||||
pluginName);
|
||||
}
|
||||
status.setRunning(false);
|
||||
purgeDao.update(status);
|
||||
/*
|
||||
* Log execution times
|
||||
*/
|
||||
long executionTime = getAge();
|
||||
long execTimeInMinutes = executionTime / 60000;
|
||||
if (execTimeInMinutes > 0) {
|
||||
PurgeLogger.logInfo("Purge run time: " + executionTime
|
||||
+ " ms (" + execTimeInMinutes + " minutes)",
|
||||
this.pluginName);
|
||||
} else {
|
||||
PurgeLogger.logInfo("Purge run time: " + executionTime
|
||||
+ " ms", this.pluginName);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
PurgeLogger
|
||||
.logError(
|
||||
"An unexpected error occurred upon completion of the purge job",
|
||||
this.pluginName, e);
|
||||
} finally {
|
||||
ClusterLockUtils.unlock(purgeLock, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void printTimedOutMessage(int deadPurgeJobAge) {
|
||||
// only print message every 5 minutes
|
||||
if (System.currentTimeMillis() - lastTimeOutMessage > 300000) {
|
||||
PurgeLogger.logFatal(
|
||||
"Purger running time has exceeded timeout duration of "
|
||||
+ deadPurgeJobAge
|
||||
+ " minutes. Current running time: "
|
||||
+ (getAge() / 60000) + " minutes", pluginName);
|
||||
printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints the stack trace for this job thread.
|
||||
*/
|
||||
public void printStackTrace() {
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
buffer.append("Stack trace for Purge Job Thread:\n");
|
||||
buffer.append(getStackTrace(this));
|
||||
// If this thread is blocked, output the stack traces for the other
|
||||
// blocked threads to assist in determining the source of the
|
||||
// deadlocked
|
||||
// threads
|
||||
if (this.getState().equals(State.BLOCKED)) {
|
||||
buffer.append("\tDUMPING OTHER BLOCKED THREADS\n");
|
||||
buffer.append(getBlockedStackTraces());
|
||||
|
||||
}
|
||||
PurgeLogger.logError(buffer.toString(), this.pluginName);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the stack traces for all other threads in the BLOCKED state in the
|
||||
* JVM
|
||||
*
|
||||
* @return The stack traces for all other threads in the BLOCKED state in
|
||||
* the JVM
|
||||
*/
|
||||
private String getBlockedStackTraces() {
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
Map<Thread, StackTraceElement[]> threads = Thread.getAllStackTraces();
|
||||
for (Thread t : threads.keySet()) {
|
||||
if (t.getState().equals(State.BLOCKED)) {
|
||||
if (t.getId() != this.getId()) {
|
||||
buffer.append(getStackTrace(t));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the stack trace for the given thread
|
||||
*
|
||||
* @param thread
|
||||
* The thread to get the stack trace for
|
||||
* @return The stack trace as a String
|
||||
*/
|
||||
private String getStackTrace(Thread thread) {
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
StackTraceElement[] stack = Thread.getAllStackTraces().get(thread);
|
||||
buffer.append("\tThread ID: ").append(thread.getId())
|
||||
.append(" Thread state: ").append(this.getState())
|
||||
.append("\n");
|
||||
if (stack == null) {
|
||||
buffer.append("No stack trace could be retrieved for this thread");
|
||||
} else {
|
||||
for (int i = 0; i < stack.length; i++) {
|
||||
buffer.append("\t\t").append(stack[i]).append("\n");
|
||||
}
|
||||
}
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
public long getStartTime() {
|
||||
return startTime;
|
||||
}
|
||||
|
||||
public long getAge() {
|
||||
return System.currentTimeMillis() - startTime;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.purgesrv;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||
import com.raytheon.uf.edex.database.plugin.PluginDao;
|
||||
import com.raytheon.uf.edex.database.plugin.PluginFactory;
|
||||
import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
||||
|
||||
/**
|
||||
*
|
||||
* This class encapsulates the purge activity for a plugin into a cluster task.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Apr 19, 2012 #470 bphillip Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1.0
|
||||
*/
|
||||
public class PurgeJob extends Thread {
|
||||
|
||||
/** The type of purge */
|
||||
public enum PURGE_JOB_TYPE {
|
||||
PURGE_ALL, PURGE_EXPIRED
|
||||
}
|
||||
|
||||
private long startTime;
|
||||
|
||||
/** The cluster task name to use for purge jobs */
|
||||
public static final String TASK_NAME = "Purge Plugin Data";
|
||||
|
||||
/** The plugin associated with this purge job */
|
||||
private String pluginName;
|
||||
|
||||
/** The type of purge job being executed */
|
||||
private PURGE_JOB_TYPE purgeType;
|
||||
|
||||
/** Last time job has printed a timed out message */
|
||||
private long lastTimeOutMessage = 0;
|
||||
|
||||
/**
|
||||
* Creates a new Purge job for the specified plugin.
|
||||
*
|
||||
* @param pluginName
|
||||
* The plugin to be purged
|
||||
* @param purgeType
|
||||
* The type of purge to be executed
|
||||
*/
|
||||
public PurgeJob(String pluginName, PURGE_JOB_TYPE purgeType) {
|
||||
// Give the thread a name
|
||||
this.setName("Purge-" + pluginName.toUpperCase() + "-Thread");
|
||||
this.pluginName = pluginName;
|
||||
this.purgeType = purgeType;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
|
||||
// Flag used to track if this job has failed
|
||||
boolean failed = false;
|
||||
startTime = System.currentTimeMillis();
|
||||
PurgeLogger.logInfo("Purging expired data...", pluginName);
|
||||
PluginDao dao = null;
|
||||
|
||||
try {
|
||||
dao = PluginFactory.getInstance().getPluginDao(pluginName);
|
||||
if (dao.getDaoClass() != null) {
|
||||
dao.purgeExpiredData();
|
||||
PurgeLogger.logInfo("Data successfully Purged!", pluginName);
|
||||
} else {
|
||||
Method m = dao.getClass().getMethod("purgeExpiredData",
|
||||
new Class[] {});
|
||||
if (m != null) {
|
||||
if (m.getDeclaringClass().equals(PluginDao.class)) {
|
||||
PurgeLogger
|
||||
.logWarn(
|
||||
"Unable to purge data. This plugin does not specify a record class and does not implement a custom purger.",
|
||||
pluginName);
|
||||
} else {
|
||||
if (this.purgeType.equals(PURGE_JOB_TYPE.PURGE_EXPIRED)) {
|
||||
dao.purgeExpiredData();
|
||||
} else {
|
||||
dao.purgeAllData();
|
||||
}
|
||||
PurgeLogger.logInfo("Data successfully Purged!",
|
||||
pluginName);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
failed = true;
|
||||
// keep getting next exceptions with sql exceptions to ensure
|
||||
// we can see the underlying error
|
||||
PurgeLogger
|
||||
.logError("Error purging expired data!\n", pluginName, e);
|
||||
Throwable t = e.getCause();
|
||||
while (t != null) {
|
||||
if (t instanceof SQLException) {
|
||||
SQLException se = ((SQLException) t).getNextException();
|
||||
PurgeLogger.logError("Next exception:", pluginName, se);
|
||||
}
|
||||
t = t.getCause();
|
||||
}
|
||||
} finally {
|
||||
ClusterTask purgeLock = PurgeManager.getInstance().getPurgeLock();
|
||||
try {
|
||||
/*
|
||||
* Update the status accordingly if the purge failed or
|
||||
* succeeded
|
||||
*/
|
||||
PurgeDao purgeDao = new PurgeDao();
|
||||
PurgeJobStatus status = purgeDao
|
||||
.getJobForPlugin(this.pluginName);
|
||||
if (status == null) {
|
||||
PurgeLogger.logError(
|
||||
"Purge job completed but no status object found!",
|
||||
this.pluginName);
|
||||
} else {
|
||||
if (failed) {
|
||||
status.incrementFailedCount();
|
||||
if (status.getFailedCount() >= PurgeManager
|
||||
.getInstance().getFatalFailureCount()) {
|
||||
PurgeLogger
|
||||
.logFatal(
|
||||
"Purger for this plugin has reached or exceeded consecutive failure limit of "
|
||||
+ PurgeManager
|
||||
.getInstance()
|
||||
.getFatalFailureCount()
|
||||
+ ". Data will no longer being purged for this plugin.",
|
||||
pluginName);
|
||||
} else {
|
||||
PurgeLogger.logError("Purge job has failed "
|
||||
+ status.getFailedCount()
|
||||
+ " consecutive times.", this.pluginName);
|
||||
// Back the start time off by half an hour to try to
|
||||
// purgin soon, don't want to start immediately so
|
||||
// it doesn't ping pong between servers in a time
|
||||
// out scenario
|
||||
Date startTime = status.getStartTime();
|
||||
startTime.setTime(startTime.getTime() - (1800000));
|
||||
}
|
||||
} else {
|
||||
status.setFailedCount(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* This purger thread has exceeded the time out duration but
|
||||
* finally finished. Output a message and update the status
|
||||
*/
|
||||
int deadPurgeJobAge = PurgeManager.getInstance()
|
||||
.getDeadPurgeJobAge();
|
||||
Calendar purgeTimeOutLimit = Calendar.getInstance();
|
||||
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
|
||||
if (startTime < purgeTimeOutLimit.getTimeInMillis()) {
|
||||
PurgeLogger
|
||||
.logInfo(
|
||||
"Purge job has recovered from timed out state!!",
|
||||
pluginName);
|
||||
}
|
||||
status.setRunning(false);
|
||||
purgeDao.update(status);
|
||||
/*
|
||||
* Log execution times
|
||||
*/
|
||||
long executionTime = getAge();
|
||||
long execTimeInMinutes = executionTime / 60000;
|
||||
if (execTimeInMinutes > 0) {
|
||||
PurgeLogger.logInfo("Purge run time: " + executionTime
|
||||
+ " ms (" + execTimeInMinutes + " minutes)",
|
||||
this.pluginName);
|
||||
} else {
|
||||
PurgeLogger.logInfo("Purge run time: " + executionTime
|
||||
+ " ms", this.pluginName);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
PurgeLogger
|
||||
.logError(
|
||||
"An unexpected error occurred upon completion of the purge job",
|
||||
this.pluginName, e);
|
||||
} finally {
|
||||
ClusterLockUtils.unlock(purgeLock, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void printTimedOutMessage(int deadPurgeJobAge) {
|
||||
// only print message every 5 minutes
|
||||
if (System.currentTimeMillis() - lastTimeOutMessage > 300000) {
|
||||
PurgeLogger.logFatal(
|
||||
"Purger running time has exceeded timeout duration of "
|
||||
+ deadPurgeJobAge
|
||||
+ " minutes. Current running time: "
|
||||
+ (getAge() / 60000) + " minutes", pluginName);
|
||||
printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints the stack trace for this job thread.
|
||||
*/
|
||||
public void printStackTrace() {
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
buffer.append("Stack trace for Purge Job Thread:\n");
|
||||
buffer.append(getStackTrace(this));
|
||||
// If this thread is blocked, output the stack traces for the other
|
||||
// blocked threads to assist in determining the source of the
|
||||
// deadlocked
|
||||
// threads
|
||||
if (this.getState().equals(State.BLOCKED)) {
|
||||
buffer.append("\tDUMPING OTHER BLOCKED THREADS\n");
|
||||
buffer.append(getBlockedStackTraces());
|
||||
|
||||
}
|
||||
PurgeLogger.logError(buffer.toString(), this.pluginName);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the stack traces for all other threads in the BLOCKED state in the
|
||||
* JVM
|
||||
*
|
||||
* @return The stack traces for all other threads in the BLOCKED state in
|
||||
* the JVM
|
||||
*/
|
||||
private String getBlockedStackTraces() {
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
Map<Thread, StackTraceElement[]> threads = Thread.getAllStackTraces();
|
||||
for (Thread t : threads.keySet()) {
|
||||
if (t.getState().equals(State.BLOCKED)) {
|
||||
if (t.getId() != this.getId()) {
|
||||
buffer.append(getStackTrace(t));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the stack trace for the given thread
|
||||
*
|
||||
* @param thread
|
||||
* The thread to get the stack trace for
|
||||
* @return The stack trace as a String
|
||||
*/
|
||||
private String getStackTrace(Thread thread) {
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
StackTraceElement[] stack = Thread.getAllStackTraces().get(thread);
|
||||
buffer.append("\tThread ID: ").append(thread.getId())
|
||||
.append(" Thread state: ").append(this.getState())
|
||||
.append("\n");
|
||||
if (stack == null) {
|
||||
buffer.append("No stack trace could be retrieved for this thread");
|
||||
} else {
|
||||
for (int i = 0; i < stack.length; i++) {
|
||||
buffer.append("\t\t").append(stack[i]).append("\n");
|
||||
}
|
||||
}
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
public long getStartTime() {
|
||||
return startTime;
|
||||
}
|
||||
|
||||
public long getAge() {
|
||||
return System.currentTimeMillis() - startTime;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,480 +1,488 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.purgesrv;
|
||||
|
||||
import java.lang.Thread.State;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||
import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
||||
import com.raytheon.uf.edex.database.status.StatusConstants;
|
||||
import com.raytheon.uf.edex.purgesrv.PurgeJob.PURGE_JOB_TYPE;
|
||||
|
||||
/**
|
||||
*
|
||||
* Object for managing purge jobs. The purge manager relies on the purgejobs
|
||||
* table to coordinate information. The executePurge() method on this class is
|
||||
* executed every minute via a quartz timer defined in the purge-spring.xml
|
||||
* Spring configuration file.
|
||||
* <p>
|
||||
* The purge manager is designed to adhere to the following rules:
|
||||
* <p>
|
||||
* · The cluster may have no more than 6 purge jobs running simultaneously by
|
||||
* default. This property is configurable in the project.properties file<br>
|
||||
* · Any given server may have no more than 2 purge jobs running simultaneously
|
||||
* by default. This property is configurable in the project.properties file<br>
|
||||
* · A purge job for a plugin is considered 'hung' if it has been running for
|
||||
* more than 20 minutes by default. This property is configurable in the
|
||||
* project.properties file <br>
|
||||
* · If a purge job that was previously determined to be hung actually finishes
|
||||
* it's execution, the cluster lock is updated appropriately and the purge job
|
||||
* is able to resume normal operation. This is in place so if a hung purge
|
||||
* process goes unnoticed for a period of time, the server will still try to
|
||||
* recover autonomously if it can. <br>
|
||||
* · If a purge job is determined to be hung, the stack trace for the thread
|
||||
* executing the job is output to the log. Furthermore, if the job is in the
|
||||
* BLOCKED state, the stack traces for all other BLOCKED threads is output to
|
||||
* the purge log as part of a rudimentary deadlock detection strategy to be used
|
||||
* by personnel attempting to remedy the situation.<br>
|
||||
* · By default, a fatal condition occurs if a given plugin's purge job fails 3
|
||||
* consecutive times.<br>
|
||||
* · If a purge job hangs on one server in the cluster, it will try and run on
|
||||
* another cluster member at the next purge interval.<br>
|
||||
* · If the purge manager attempts to purge a plugin that has been running for
|
||||
* longer than the 20 minute threshold, it is considered a failure, and the
|
||||
* failure count is updated.
|
||||
* <p>
|
||||
*
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Apr 18, 2012 #470 bphillip Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1.0
|
||||
*/
|
||||
public class PurgeManager {
|
||||
|
||||
/** Purge Manager task name */
|
||||
private static final String PURGE_TASK_NAME = "Purge Manager";
|
||||
|
||||
/** Purge Manager task details */
|
||||
private static final String PURGE_TASK_DETAILS = "Purge Manager Job";
|
||||
|
||||
/** Purge Manager task override timeout. Currently 2 minutes */
|
||||
private static final long PURGE_MANAGER_TIMEOUT = 120000;
|
||||
|
||||
/**
|
||||
* The cluster limit property to be set via Spring with the value defined in
|
||||
* project.properties
|
||||
*/
|
||||
private int clusterLimit = 6;
|
||||
|
||||
/**
|
||||
* The server limit property to be set via Spring with the value defined in
|
||||
* project.properties
|
||||
*/
|
||||
private int serverLimit = 2;
|
||||
|
||||
/**
|
||||
* The time in minutes at which a purge job is considered 'dead' or 'hung'
|
||||
* set via Spring with the value defined in project.properties
|
||||
*/
|
||||
private int deadPurgeJobAge = 20;
|
||||
|
||||
/**
|
||||
* The frequency, in minutes, that a plugin may be purged set via Spring
|
||||
* with the value defined in project.properties
|
||||
*/
|
||||
private int purgeFrequency = 60;
|
||||
|
||||
/**
|
||||
* How many times a purger is allowed to fail before it is considered fatal.
|
||||
* Set via Spring with the value defined in project.properties
|
||||
*/
|
||||
private int fatalFailureCount = 3;
|
||||
|
||||
/**
|
||||
* The master switch defined in project.properties that enables and disables
|
||||
* data purging
|
||||
*/
|
||||
private boolean purgeEnabled = true;
|
||||
|
||||
/** Map of purge jobs */
|
||||
private Map<String, PurgeJob> purgeJobs = new ConcurrentHashMap<String, PurgeJob>();
|
||||
|
||||
private PurgeDao dao = new PurgeDao();
|
||||
|
||||
private static PurgeManager instance = new PurgeManager();
|
||||
|
||||
public static PurgeManager getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new PurgeManager
|
||||
*/
|
||||
private PurgeManager() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the purge routine
|
||||
*/
|
||||
public void executePurge() {
|
||||
if (!purgeEnabled) {
|
||||
PurgeLogger.logWarn(
|
||||
"Data purging has been disabled. No data will be purged.",
|
||||
null);
|
||||
return;
|
||||
}
|
||||
|
||||
ClusterTask purgeMgrTask = getPurgeLock();
|
||||
|
||||
try {
|
||||
// Prune the job map
|
||||
Iterator<PurgeJob> iter = purgeJobs.values().iterator();
|
||||
while (iter.hasNext()) {
|
||||
if (!iter.next().isAlive()) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
Calendar purgeTimeOutLimit = Calendar.getInstance();
|
||||
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
|
||||
Calendar purgeFrequencyLimit = Calendar.getInstance();
|
||||
purgeFrequencyLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
purgeFrequencyLimit.add(Calendar.MINUTE, -purgeFrequency);
|
||||
|
||||
// Gets the list of plugins in ascending order by the last time they
|
||||
// were purged
|
||||
List<String> pluginList = dao.getPluginsByPurgeTime();
|
||||
|
||||
// check for any new plugins or database being purged and needing
|
||||
// entries recreated
|
||||
Set<String> availablePlugins = new HashSet<String>(PluginRegistry
|
||||
.getInstance().getRegisteredObjects());
|
||||
|
||||
// Merge the lists
|
||||
availablePlugins.removeAll(pluginList);
|
||||
|
||||
if (availablePlugins.size() > 0) {
|
||||
// generate new list with them at the beginning
|
||||
List<String> newSortedPlugins = new ArrayList<String>(
|
||||
availablePlugins);
|
||||
Collections.sort(newSortedPlugins);
|
||||
newSortedPlugins.addAll(pluginList);
|
||||
pluginList = newSortedPlugins;
|
||||
}
|
||||
|
||||
boolean canPurge = true;
|
||||
int jobsStarted = 0;
|
||||
int maxNumberOfJobsToStart = Math.min(
|
||||
clusterLimit
|
||||
- dao.getRunningClusterJobs(
|
||||
purgeTimeOutLimit.getTime(),
|
||||
fatalFailureCount), serverLimit
|
||||
- getNumberRunningJobsOnServer(purgeTimeOutLimit));
|
||||
for (String plugin : pluginList) {
|
||||
try {
|
||||
// initialize canPurge based on number of jobs started
|
||||
canPurge = jobsStarted < maxNumberOfJobsToStart;
|
||||
PurgeJob jobThread = purgeJobs.get(plugin);
|
||||
PurgeJobStatus job = dao.getJobForPlugin(plugin);
|
||||
|
||||
if (job == null) {
|
||||
// no job on server, generate empty job
|
||||
|
||||
try {
|
||||
job = new PurgeJobStatus();
|
||||
job.setPlugin(plugin);
|
||||
job.setFailedCount(0);
|
||||
job.setRunning(false);
|
||||
job.setStartTime(new Date(0));
|
||||
dao.create(job);
|
||||
} catch (Throwable e) {
|
||||
PurgeLogger.logError(
|
||||
"Failed to create new purge job entry",
|
||||
plugin, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if this job has met the fatal failure count
|
||||
if (job.getFailedCount() >= fatalFailureCount) {
|
||||
canPurge = false;
|
||||
PurgeLogger
|
||||
.logFatal(
|
||||
"Purger for this plugin has reached or exceeded consecutive failure limit of "
|
||||
+ fatalFailureCount
|
||||
+ ". Data will no longer being purged for this plugin.",
|
||||
plugin);
|
||||
}
|
||||
|
||||
if (job.isRunning()) {
|
||||
// check if job has timed out
|
||||
if (purgeTimeOutLimit.getTimeInMillis() > job
|
||||
.getStartTime().getTime()) {
|
||||
// if no one else sets canPurge = false will start
|
||||
// purging on this server
|
||||
if (jobThread != null) {
|
||||
// job currently running on our server, don't
|
||||
// start another
|
||||
canPurge = false;
|
||||
jobThread.printTimedOutMessage(deadPurgeJobAge);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// not currently running, check if need to be purged
|
||||
Date startTime = job.getStartTime();
|
||||
if (startTime != null
|
||||
&& startTime.getTime() >= purgeFrequencyLimit
|
||||
.getTimeInMillis()) {
|
||||
canPurge = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (canPurge) {
|
||||
purgeJobs.put(plugin, purgeExpiredData(plugin));
|
||||
jobsStarted++;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
PurgeLogger
|
||||
.logError(
|
||||
"An unexpected error occured during the purge job check for plugin",
|
||||
plugin, e);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
PurgeLogger
|
||||
.logError(
|
||||
"An unexpected error occured during the data purge process",
|
||||
StatusConstants.CATEGORY_PURGE, e);
|
||||
} finally {
|
||||
// Unlock the purge task to allow other servers to run.
|
||||
ClusterLockUtils.unlock(purgeMgrTask, false);
|
||||
// PurgeLogger.logInfo(getPurgeStatus(true), null);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private String getPurgeStatus(boolean verbose) {
|
||||
Calendar purgeTimeOutLimit = Calendar.getInstance();
|
||||
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
|
||||
|
||||
StringBuilder builder = new StringBuilder();
|
||||
List<PurgeJobStatus> failedJobs = dao.getFailedJobs(fatalFailureCount);
|
||||
|
||||
List<PurgeJobStatus> timedOutJobs = dao
|
||||
.getTimedOutJobs(purgeTimeOutLimit.getTime());
|
||||
int clusterJobs = dao.getRunningClusterJobs(
|
||||
purgeTimeOutLimit.getTime(), fatalFailureCount);
|
||||
Map<String, List<PurgeJobStatus>> serverMap = dao
|
||||
.getRunningServerJobs();
|
||||
builder.append("\nPURGE JOB STATUS:");
|
||||
builder.append("\n\tTotal Jobs Running On Cluster: ").append(
|
||||
clusterJobs);
|
||||
List<PurgeJobStatus> jobs = null;
|
||||
for (String server : serverMap.keySet()) {
|
||||
jobs = serverMap.get(server);
|
||||
builder.append("\n\tJobs Running On ").append(server).append(": ")
|
||||
.append(jobs.size());
|
||||
if (verbose && !jobs.isEmpty()) {
|
||||
builder.append(" Plugins: ");
|
||||
for (int i = 0; i < jobs.size(); i++) {
|
||||
builder.append(jobs.get(i).getPlugin());
|
||||
if (i != jobs.size() - 1) {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (verbose) {
|
||||
builder.append("\n\tFailed Jobs: ");
|
||||
if (failedJobs.isEmpty()) {
|
||||
builder.append("0");
|
||||
} else {
|
||||
PurgeJobStatus currentJob = null;
|
||||
for (int i = 0; i < failedJobs.size(); i++) {
|
||||
currentJob = failedJobs.get(i);
|
||||
builder.append(currentJob.getPlugin());
|
||||
if (i != failedJobs.size() - 1) {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
builder.append("\n\tTimed Out Jobs: ");
|
||||
if (timedOutJobs.isEmpty()) {
|
||||
builder.append("0");
|
||||
} else {
|
||||
PurgeJobStatus currentJob = null;
|
||||
for (int i = 0; i < timedOutJobs.size(); i++) {
|
||||
currentJob = timedOutJobs.get(i);
|
||||
builder.append(currentJob.getPlugin());
|
||||
if (i != timedOutJobs.size() - 1) {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
public ClusterTask getPurgeLock() {
|
||||
// Lock so only one cluster member may start purge processes
|
||||
ClusterTask purgeMgrTask = ClusterLockUtils.lock(PURGE_TASK_NAME,
|
||||
PURGE_TASK_DETAILS, PURGE_MANAGER_TIMEOUT, true);
|
||||
|
||||
LockState purgeMgrLockState = purgeMgrTask.getLockState();
|
||||
switch (purgeMgrLockState) {
|
||||
case FAILED:
|
||||
PurgeLogger.logError(
|
||||
"Purge Manager failed to acquire cluster task lock",
|
||||
StatusConstants.CATEGORY_PURGE);
|
||||
return null;
|
||||
case OLD:
|
||||
PurgeLogger.logWarn("Purge Manager acquired old cluster task lock",
|
||||
StatusConstants.CATEGORY_PURGE);
|
||||
break;
|
||||
case ALREADY_RUNNING:
|
||||
PurgeLogger
|
||||
.logWarn(
|
||||
"Purge Manager acquired currently running cluster task lock",
|
||||
StatusConstants.CATEGORY_PURGE);
|
||||
return null;
|
||||
case SUCCESSFUL:
|
||||
break;
|
||||
}
|
||||
return purgeMgrTask;
|
||||
}
|
||||
|
||||
private int getNumberRunningJobsOnServer(Calendar timeOutTime) {
|
||||
int rval = 0;
|
||||
for (PurgeJob job : purgeJobs.values()) {
|
||||
// if job has not timed out or if the job is not blocked consider it
|
||||
// running on this server
|
||||
if (timeOutTime.getTimeInMillis() < job.getStartTime()
|
||||
|| !job.getState().equals(State.BLOCKED)) {
|
||||
rval++;
|
||||
}
|
||||
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a purge expired data job for the specified plugin. Using this
|
||||
* method allows for exceeding failure count via a manual purge as well as
|
||||
* kicking off a second purge for one already running on a server.
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin to purge the expired data for
|
||||
* @return The PurgeJob that was started
|
||||
*/
|
||||
public PurgeJob purgeExpiredData(String plugin) {
|
||||
dao.startJob(plugin);
|
||||
PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_EXPIRED);
|
||||
job.start();
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a purge all data job for the specified plugin. Using this method
|
||||
* allows for exceeding failure count via a manual purge as well as kicking
|
||||
* off a second purge for one already running on a server.
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin to purge all data for
|
||||
* @return The PurgeJob that was started
|
||||
*/
|
||||
public PurgeJob purgeAllData(String plugin) {
|
||||
dao.startJob(plugin);
|
||||
PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_ALL);
|
||||
job.start();
|
||||
return job;
|
||||
}
|
||||
|
||||
public int getClusterLimit() {
|
||||
return clusterLimit;
|
||||
}
|
||||
|
||||
public void setClusterLimit(int clusterLimit) {
|
||||
this.clusterLimit = clusterLimit;
|
||||
}
|
||||
|
||||
public int getServerLimit() {
|
||||
return serverLimit;
|
||||
}
|
||||
|
||||
public void setServerLimit(int serverLimit) {
|
||||
this.serverLimit = serverLimit;
|
||||
}
|
||||
|
||||
public int getDeadPurgeJobAge() {
|
||||
return deadPurgeJobAge;
|
||||
}
|
||||
|
||||
public void setDeadPurgeJobAge(int deadPurgeJobAge) {
|
||||
this.deadPurgeJobAge = deadPurgeJobAge;
|
||||
}
|
||||
|
||||
public int getPurgeFrequency() {
|
||||
return purgeFrequency;
|
||||
}
|
||||
|
||||
public void setPurgeFrequency(int purgeFrequency) {
|
||||
this.purgeFrequency = purgeFrequency;
|
||||
}
|
||||
|
||||
public int getFatalFailureCount() {
|
||||
return this.fatalFailureCount;
|
||||
}
|
||||
|
||||
public void setFatalFailureCount(int fatalFailureCount) {
|
||||
this.fatalFailureCount = fatalFailureCount;
|
||||
}
|
||||
|
||||
public void setPurgeEnabled(boolean purgeEnabled) {
|
||||
this.purgeEnabled = purgeEnabled;
|
||||
}
|
||||
|
||||
public boolean getPurgeEnabled() {
|
||||
return purgeEnabled;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.purgesrv;
|
||||
|
||||
import java.lang.Thread.State;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||
import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
||||
import com.raytheon.uf.edex.database.status.StatusConstants;
|
||||
import com.raytheon.uf.edex.purgesrv.PurgeJob.PURGE_JOB_TYPE;
|
||||
|
||||
/**
|
||||
*
|
||||
* Object for managing purge jobs. The purge manager relies on the purgejobs
|
||||
* table to coordinate information. The executePurge() method on this class is
|
||||
* executed every minute via a quartz timer defined in the purge-spring.xml
|
||||
* Spring configuration file.
|
||||
* <p>
|
||||
* The purge manager is designed to adhere to the following rules:
|
||||
* <p>
|
||||
* · The cluster may have no more than 6 purge jobs running simultaneously by
|
||||
* default. This property is configurable in the project.properties file<br>
|
||||
* · Any given server may have no more than 2 purge jobs running simultaneously
|
||||
* by default. This property is configurable in the project.properties file<br>
|
||||
* · A purge job for a plugin is considered 'hung' if it has been running for
|
||||
* more than 20 minutes by default. This property is configurable in the
|
||||
* project.properties file <br>
|
||||
* · If a purge job that was previously determined to be hung actually finishes
|
||||
* it's execution, the cluster lock is updated appropriately and the purge job
|
||||
* is able to resume normal operation. This is in place so if a hung purge
|
||||
* process goes unnoticed for a period of time, the server will still try to
|
||||
* recover autonomously if it can. <br>
|
||||
* · If a purge job is determined to be hung, the stack trace for the thread
|
||||
* executing the job is output to the log. Furthermore, if the job is in the
|
||||
* BLOCKED state, the stack traces for all other BLOCKED threads is output to
|
||||
* the purge log as part of a rudimentary deadlock detection strategy to be used
|
||||
* by personnel attempting to remedy the situation.<br>
|
||||
* · By default, a fatal condition occurs if a given plugin's purge job fails 3
|
||||
* consecutive times.<br>
|
||||
* · If a purge job hangs on one server in the cluster, it will try and run on
|
||||
* another cluster member at the next purge interval.<br>
|
||||
* · If the purge manager attempts to purge a plugin that has been running for
|
||||
* longer than the 20 minute threshold, it is considered a failure, and the
|
||||
* failure count is updated.
|
||||
* <p>
|
||||
*
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Apr 18, 2012 #470 bphillip Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1.0
|
||||
*/
|
||||
public class PurgeManager {
|
||||
|
||||
/** Purge Manager task name */
|
||||
private static final String PURGE_TASK_NAME = "Purge Manager";
|
||||
|
||||
/** Purge Manager task details */
|
||||
private static final String PURGE_TASK_DETAILS = "Purge Manager Job";
|
||||
|
||||
/** Purge Manager task override timeout. Currently 2 minutes */
|
||||
private static final long PURGE_MANAGER_TIMEOUT = 120000;
|
||||
|
||||
/**
|
||||
* The cluster limit property to be set via Spring with the value defined in
|
||||
* project.properties
|
||||
*/
|
||||
private int clusterLimit = 6;
|
||||
|
||||
/**
|
||||
* The server limit property to be set via Spring with the value defined in
|
||||
* project.properties
|
||||
*/
|
||||
private int serverLimit = 2;
|
||||
|
||||
/**
|
||||
* The time in minutes at which a purge job is considered 'dead' or 'hung'
|
||||
* set via Spring with the value defined in project.properties
|
||||
*/
|
||||
private int deadPurgeJobAge = 20;
|
||||
|
||||
/**
|
||||
* The frequency, in minutes, that a plugin may be purged set via Spring
|
||||
* with the value defined in project.properties
|
||||
*/
|
||||
private int purgeFrequency = 60;
|
||||
|
||||
/**
|
||||
* How many times a purger is allowed to fail before it is considered fatal.
|
||||
* Set via Spring with the value defined in project.properties
|
||||
*/
|
||||
private int fatalFailureCount = 3;
|
||||
|
||||
/**
|
||||
* The master switch defined in project.properties that enables and disables
|
||||
* data purging
|
||||
*/
|
||||
private boolean purgeEnabled = true;
|
||||
|
||||
/** Map of purge jobs */
|
||||
private Map<String, PurgeJob> purgeJobs = new ConcurrentHashMap<String, PurgeJob>();
|
||||
|
||||
private PurgeDao dao = new PurgeDao();
|
||||
|
||||
private static PurgeManager instance = new PurgeManager();
|
||||
|
||||
public static PurgeManager getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new PurgeManager
|
||||
*/
|
||||
private PurgeManager() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the purge routine
|
||||
*/
|
||||
public void executePurge() {
|
||||
if (!purgeEnabled) {
|
||||
PurgeLogger.logWarn(
|
||||
"Data purging has been disabled. No data will be purged.",
|
||||
null);
|
||||
return;
|
||||
}
|
||||
|
||||
ClusterTask purgeMgrTask = getPurgeLock();
|
||||
|
||||
try {
|
||||
// Prune the job map
|
||||
Iterator<PurgeJob> iter = purgeJobs.values().iterator();
|
||||
while (iter.hasNext()) {
|
||||
if (!iter.next().isAlive()) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
Calendar purgeTimeOutLimit = Calendar.getInstance();
|
||||
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
|
||||
Calendar purgeFrequencyLimit = Calendar.getInstance();
|
||||
purgeFrequencyLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
purgeFrequencyLimit.add(Calendar.MINUTE, -purgeFrequency);
|
||||
|
||||
// Gets the list of plugins in ascending order by the last time they
|
||||
// were purged
|
||||
List<String> pluginList = dao.getPluginsByPurgeTime();
|
||||
|
||||
// check for any new plugins or database being purged and needing
|
||||
// entries recreated
|
||||
Set<String> availablePlugins = new HashSet<String>(PluginRegistry
|
||||
.getInstance().getRegisteredObjects());
|
||||
|
||||
// Merge the lists
|
||||
availablePlugins.removeAll(pluginList);
|
||||
|
||||
if (availablePlugins.size() > 0) {
|
||||
// generate new list with them at the beginning
|
||||
List<String> newSortedPlugins = new ArrayList<String>(
|
||||
availablePlugins);
|
||||
Collections.sort(newSortedPlugins);
|
||||
newSortedPlugins.addAll(pluginList);
|
||||
pluginList = newSortedPlugins;
|
||||
}
|
||||
|
||||
boolean canPurge = true;
|
||||
int jobsStarted = 0;
|
||||
int maxNumberOfJobsToStart = Math.min(
|
||||
clusterLimit
|
||||
- dao.getRunningClusterJobs(
|
||||
purgeTimeOutLimit.getTime(),
|
||||
fatalFailureCount), serverLimit
|
||||
- getNumberRunningJobsOnServer(purgeTimeOutLimit));
|
||||
for (String plugin : pluginList) {
|
||||
try {
|
||||
// initialize canPurge based on number of jobs started
|
||||
canPurge = jobsStarted < maxNumberOfJobsToStart;
|
||||
PurgeJob jobThread = purgeJobs.get(plugin);
|
||||
PurgeJobStatus job = dao.getJobForPlugin(plugin);
|
||||
|
||||
if (job == null) {
|
||||
// no job in database, generate empty job
|
||||
|
||||
try {
|
||||
job = new PurgeJobStatus();
|
||||
job.setPlugin(plugin);
|
||||
job.setFailedCount(0);
|
||||
job.setRunning(false);
|
||||
job.setStartTime(new Date(0));
|
||||
dao.create(job);
|
||||
} catch (Throwable e) {
|
||||
PurgeLogger.logError(
|
||||
"Failed to create new purge job entry",
|
||||
plugin, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if this job has met the fatal failure count
|
||||
if (job.getFailedCount() >= fatalFailureCount) {
|
||||
canPurge = false;
|
||||
PurgeLogger
|
||||
.logFatal(
|
||||
"Purger for this plugin has reached or exceeded consecutive failure limit of "
|
||||
+ fatalFailureCount
|
||||
+ ". Data will no longer being purged for this plugin.",
|
||||
plugin);
|
||||
}
|
||||
|
||||
// is purge job currently running on this server
|
||||
if (jobThread != null) {
|
||||
// job currently running on our server, don't start
|
||||
// another
|
||||
canPurge = false;
|
||||
|
||||
if (purgeTimeOutLimit.getTimeInMillis() > jobThread
|
||||
.getStartTime()) {
|
||||
jobThread.printTimedOutMessage(deadPurgeJobAge);
|
||||
}
|
||||
} else {
|
||||
if (job.isRunning()) {
|
||||
// check if job has timed out
|
||||
if (purgeTimeOutLimit.getTime().before(
|
||||
job.getStartTime())) {
|
||||
canPurge = false;
|
||||
}
|
||||
// else if no one else sets canPurge = false will
|
||||
// start purging on this server
|
||||
} else {
|
||||
// not currently running, check if need to be purged
|
||||
Date startTime = job.getStartTime();
|
||||
if (startTime != null
|
||||
&& startTime.after(purgeFrequencyLimit
|
||||
.getTime())) {
|
||||
canPurge = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (canPurge) {
|
||||
purgeJobs.put(plugin, purgeExpiredData(plugin));
|
||||
jobsStarted++;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
PurgeLogger
|
||||
.logError(
|
||||
"An unexpected error occured during the purge job check for plugin",
|
||||
plugin, e);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
PurgeLogger
|
||||
.logError(
|
||||
"An unexpected error occured during the data purge process",
|
||||
StatusConstants.CATEGORY_PURGE, e);
|
||||
} finally {
|
||||
// Unlock the purge task to allow other servers to run.
|
||||
ClusterLockUtils.unlock(purgeMgrTask, false);
|
||||
// PurgeLogger.logInfo(getPurgeStatus(true), null);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private String getPurgeStatus(boolean verbose) {
|
||||
Calendar purgeTimeOutLimit = Calendar.getInstance();
|
||||
purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge);
|
||||
|
||||
StringBuilder builder = new StringBuilder();
|
||||
List<PurgeJobStatus> failedJobs = dao.getFailedJobs(fatalFailureCount);
|
||||
|
||||
List<PurgeJobStatus> timedOutJobs = dao
|
||||
.getTimedOutJobs(purgeTimeOutLimit.getTime());
|
||||
int clusterJobs = dao.getRunningClusterJobs(
|
||||
purgeTimeOutLimit.getTime(), fatalFailureCount);
|
||||
Map<String, List<PurgeJobStatus>> serverMap = dao
|
||||
.getRunningServerJobs();
|
||||
builder.append("\nPURGE JOB STATUS:");
|
||||
builder.append("\n\tTotal Jobs Running On Cluster: ").append(
|
||||
clusterJobs);
|
||||
List<PurgeJobStatus> jobs = null;
|
||||
for (String server : serverMap.keySet()) {
|
||||
jobs = serverMap.get(server);
|
||||
builder.append("\n\tJobs Running On ").append(server).append(": ")
|
||||
.append(jobs.size());
|
||||
if (verbose && !jobs.isEmpty()) {
|
||||
builder.append(" Plugins: ");
|
||||
for (int i = 0; i < jobs.size(); i++) {
|
||||
builder.append(jobs.get(i).getPlugin());
|
||||
if (i != jobs.size() - 1) {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (verbose) {
|
||||
builder.append("\n\tFailed Jobs: ");
|
||||
if (failedJobs.isEmpty()) {
|
||||
builder.append("0");
|
||||
} else {
|
||||
PurgeJobStatus currentJob = null;
|
||||
for (int i = 0; i < failedJobs.size(); i++) {
|
||||
currentJob = failedJobs.get(i);
|
||||
builder.append(currentJob.getPlugin());
|
||||
if (i != failedJobs.size() - 1) {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
builder.append("\n\tTimed Out Jobs: ");
|
||||
if (timedOutJobs.isEmpty()) {
|
||||
builder.append("0");
|
||||
} else {
|
||||
PurgeJobStatus currentJob = null;
|
||||
for (int i = 0; i < timedOutJobs.size(); i++) {
|
||||
currentJob = timedOutJobs.get(i);
|
||||
builder.append(currentJob.getPlugin());
|
||||
if (i != timedOutJobs.size() - 1) {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
public ClusterTask getPurgeLock() {
|
||||
// Lock so only one cluster member may start purge processes
|
||||
ClusterTask purgeMgrTask = ClusterLockUtils.lock(PURGE_TASK_NAME,
|
||||
PURGE_TASK_DETAILS, PURGE_MANAGER_TIMEOUT, true);
|
||||
|
||||
LockState purgeMgrLockState = purgeMgrTask.getLockState();
|
||||
switch (purgeMgrLockState) {
|
||||
case FAILED:
|
||||
PurgeLogger.logError(
|
||||
"Purge Manager failed to acquire cluster task lock",
|
||||
StatusConstants.CATEGORY_PURGE);
|
||||
return null;
|
||||
case OLD:
|
||||
PurgeLogger.logWarn("Purge Manager acquired old cluster task lock",
|
||||
StatusConstants.CATEGORY_PURGE);
|
||||
break;
|
||||
case ALREADY_RUNNING:
|
||||
PurgeLogger
|
||||
.logWarn(
|
||||
"Purge Manager acquired currently running cluster task lock",
|
||||
StatusConstants.CATEGORY_PURGE);
|
||||
return null;
|
||||
case SUCCESSFUL:
|
||||
break;
|
||||
}
|
||||
return purgeMgrTask;
|
||||
}
|
||||
|
||||
private int getNumberRunningJobsOnServer(Calendar timeOutTime) {
|
||||
int rval = 0;
|
||||
for (PurgeJob job : purgeJobs.values()) {
|
||||
// if job has not timed out or if the job is not blocked consider it
|
||||
// running on this server
|
||||
if (timeOutTime.getTimeInMillis() < job.getStartTime()
|
||||
|| !job.getState().equals(State.BLOCKED)) {
|
||||
rval++;
|
||||
}
|
||||
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a purge expired data job for the specified plugin. Using this
|
||||
* method allows for exceeding failure count via a manual purge as well as
|
||||
* kicking off a second purge for one already running on a server.
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin to purge the expired data for
|
||||
* @return The PurgeJob that was started
|
||||
*/
|
||||
public PurgeJob purgeExpiredData(String plugin) {
|
||||
dao.startJob(plugin);
|
||||
PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_EXPIRED);
|
||||
job.start();
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a purge all data job for the specified plugin. Using this method
|
||||
* allows for exceeding failure count via a manual purge as well as kicking
|
||||
* off a second purge for one already running on a server.
|
||||
*
|
||||
* @param plugin
|
||||
* The plugin to purge all data for
|
||||
* @return The PurgeJob that was started
|
||||
*/
|
||||
public PurgeJob purgeAllData(String plugin) {
|
||||
dao.startJob(plugin);
|
||||
PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_ALL);
|
||||
job.start();
|
||||
return job;
|
||||
}
|
||||
|
||||
public int getClusterLimit() {
|
||||
return clusterLimit;
|
||||
}
|
||||
|
||||
public void setClusterLimit(int clusterLimit) {
|
||||
this.clusterLimit = clusterLimit;
|
||||
}
|
||||
|
||||
public int getServerLimit() {
|
||||
return serverLimit;
|
||||
}
|
||||
|
||||
public void setServerLimit(int serverLimit) {
|
||||
this.serverLimit = serverLimit;
|
||||
}
|
||||
|
||||
public int getDeadPurgeJobAge() {
|
||||
return deadPurgeJobAge;
|
||||
}
|
||||
|
||||
public void setDeadPurgeJobAge(int deadPurgeJobAge) {
|
||||
this.deadPurgeJobAge = deadPurgeJobAge;
|
||||
}
|
||||
|
||||
public int getPurgeFrequency() {
|
||||
return purgeFrequency;
|
||||
}
|
||||
|
||||
public void setPurgeFrequency(int purgeFrequency) {
|
||||
this.purgeFrequency = purgeFrequency;
|
||||
}
|
||||
|
||||
public int getFatalFailureCount() {
|
||||
return this.fatalFailureCount;
|
||||
}
|
||||
|
||||
public void setFatalFailureCount(int fatalFailureCount) {
|
||||
this.fatalFailureCount = fatalFailureCount;
|
||||
}
|
||||
|
||||
public void setPurgeEnabled(boolean purgeEnabled) {
|
||||
this.purgeEnabled = purgeEnabled;
|
||||
}
|
||||
|
||||
public boolean getPurgeEnabled() {
|
||||
return purgeEnabled;
|
||||
}
|
||||
}
|
||||
|
|
0
rpms/awips2.edex/deploy.builder/build.sh
Executable file → Normal file
0
rpms/awips2.edex/deploy.builder/build.sh
Executable file → Normal file
Loading…
Add table
Reference in a new issue