diff --git a/RadarServer/com.raytheon.rcm.lib/src/com/raytheon/rcm/config/awips1/Awips1RpsListUtil.java b/RadarServer/com.raytheon.rcm.lib/src/com/raytheon/rcm/config/awips1/Awips1RpsListUtil.java old mode 100755 new mode 100644 diff --git a/RadarServer/com.raytheon.rcm.server/src/com/raytheon/rcm/config/awips1/Awips1ConfigProvider.java b/RadarServer/com.raytheon.rcm.server/src/com/raytheon/rcm/config/awips1/Awips1ConfigProvider.java old mode 100755 new mode 100644 diff --git a/TextDao.java b/TextDao.java new file mode 100644 index 0000000000..069873d95d --- /dev/null +++ b/TextDao.java @@ -0,0 +1,72 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.edex.plugin.text.dao; + +import java.util.Calendar; + +import com.raytheon.edex.db.dao.DefaultPluginDao; +import com.raytheon.edex.textdb.dbapi.impl.TextDB; +import com.raytheon.uf.common.dataplugin.PluginException; +import com.raytheon.uf.edex.database.purge.PurgeLogger; + +/** + * DAO for text products + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date       	Ticket#		Engineer	Description
+ * ------------	----------	-----------	--------------------------
+ * Jul 10, 2009 2191        rjpeter     Update retention time handling.
+ * Aug 18, 2009 2191        rjpeter     Changed to version purging.
+ * 
+ * + * @author + * @version 1 + */ +public class TextDao extends DefaultPluginDao { + + public TextDao(String pluginName) throws PluginException { + super(pluginName); + } + + @Override + public void purgeAllData() { + logger.warn("purgeAllPluginData not implemented for text. No data will be purged."); + } + + protected void loadScripts() throws PluginException { + // no op + } + + public void purgeExpiredData() throws PluginException { + int deletedRecords = 0; + + // only do full purge every few hours since incremental purge runs every + // minute + if (Calendar.getInstance().get(Calendar.HOUR_OF_DAY) % 3 == 0) { + TextDB.purgeStdTextProducts(); + } + + PurgeLogger.logInfo("Purged " + deletedRecords + " items total.", + "text"); + } +} diff --git a/cave/build/static/linux/cave/caveEnvironment/lib/libgempak.so b/cave/build/static/linux/cave/caveEnvironment/lib/libgempak.so old mode 100755 new mode 100644 diff --git a/cave/com.raytheon.uf.viz.alertviz/src/com/raytheon/uf/viz/alertviz/AlertVizClient.java b/cave/com.raytheon.uf.viz.alertviz/src/com/raytheon/uf/viz/alertviz/AlertVizClient.java index 125ad50b33..4da25ea3ae 100644 --- a/cave/com.raytheon.uf.viz.alertviz/src/com/raytheon/uf/viz/alertviz/AlertVizClient.java +++ b/cave/com.raytheon.uf.viz.alertviz/src/com/raytheon/uf/viz/alertviz/AlertVizClient.java @@ -21,6 +21,7 @@ package com.raytheon.uf.viz.alertviz; import java.io.PrintStream; import java.io.StringWriter; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.jms.ExceptionListener; @@ -88,7 +89,11 @@ public class AlertVizClient implements MessageListener { private CopyOnWriteArrayList listeners; - private Marshaller marshaller; + private static int POOL_SIZE = 5; + + private java.util.Queue marshallers = new ConcurrentLinkedQueue(); + + private JAXBContext jaxbContext; private static AlertVizClient instance; @@ -134,8 +139,7 @@ public class AlertVizClient implements MessageListener { this.consumer.setMessageListener(this); reconnect = false; lastReconnectTime = System.currentTimeMillis(); - JAXBContext context = JAXBContext.newInstance(StatusMessage.class); - marshaller = context.createMarshaller(); + jaxbContext = JAXBContext.newInstance(StatusMessage.class); } catch (JMSException e) { reconnect = true; throw new AlertvizException("Unable to connect to notification", e); @@ -158,8 +162,11 @@ public class AlertVizClient implements MessageListener { if (retryOnExceptions == false && reconnect == true) { printToConsole(statusMessage); } else { + Marshaller marshaller = null; + try { StringWriter sw = new StringWriter(); + marshaller = getMarshaller(); marshaller.marshal(statusMessage, sw); ActiveMQTextMessage message = new ActiveMQTextMessage(); message.setText(sw.toString()); @@ -178,9 +185,21 @@ public class AlertVizClient implements MessageListener { } catch (Exception e) { throw new AlertvizException("Error sending message", e); } + + if (marshaller != null && marshallers.size() < POOL_SIZE) { + marshallers.add(marshaller); + } } } + private Marshaller getMarshaller() throws JAXBException { + Marshaller m = marshallers.poll(); + if (m == null) { + m = jaxbContext.createMarshaller(); + } + return m; + } + /** * @param statusMessage */ diff --git a/cave/com.raytheon.uf.viz.alertviz/src/com/raytheon/uf/viz/alertviz/Container.java b/cave/com.raytheon.uf.viz.alertviz/src/com/raytheon/uf/viz/alertviz/Container.java index 9414e72b71..7062fd2a67 100644 --- a/cave/com.raytheon.uf.viz.alertviz/src/com/raytheon/uf/viz/alertviz/Container.java +++ b/cave/com.raytheon.uf.viz.alertviz/src/com/raytheon/uf/viz/alertviz/Container.java @@ -188,38 +188,41 @@ public class Container implements IConfigurationChangedListener { private boolean isShotGun(StatusMessage message) { boolean retVal = false; if (lastMessage != null) { + final long shotgunMessageCheckTime = this.shotgunMessageStartTime == 0 ? this.lastMessage + .getEventTime().getTime() : this.shotgunMessageStartTime; + if (this.lastMessage.getCategory().equals(message.getCategory()) && this.lastMessage.getPriority() == message.getPriority() && this.lastMessage.getMessage().equals( message.getMessage()) - && Math.abs(this.lastMessage.getEventTime().getTime() - - message.getEventTime().getTime()) < SHOTGUN_MESSAGE_MILLISECOND_THRESHOLD) { + && (Math.abs(message.getEventTime().getTime() + - shotgunMessageCheckTime) < SHOTGUN_MESSAGE_MILLISECOND_THRESHOLD)) { retVal = true; ++this.shotgunMessageCount; if (this.shotgunMessageStartTime == 0) { - this.shotgunMessageStartTime = message.getEventTime() + this.shotgunMessageStartTime = lastMessage.getEventTime() .getTime(); } } else { - if (this.shotgunMessageCount != 0) { + if (this.shotgunMessageCount > 1) { StringBuilder sb = new StringBuilder("Received ") .append(this.shotgunMessageCount) .append(" duplicate messages in ") - .append(message.getEventTime().getTime() + .append(this.lastMessage.getEventTime().getTime() - this.shotgunMessageStartTime) .append(" milliseconds. For message: ") .append(this.lastMessage.getCategory()).append(":") .append(this.lastMessage.getSourceKey()) .append(" ").append(this.lastMessage.getMessage()); - this.shotgunMessageStartTime = 0; - this.shotgunMessageCount = 0; StatusMessage sm = new StatusMessage( this.lastMessage.getSourceKey(), "GDN_ADMIN", this.lastMessage.getPriority(), this.lastMessage.getPlugin(), sb.toString(), null); sm.setEventTime(SimulatedTime.getSystemTime().getTime()); - messageReceived(sm); + logInternal(sm); } + this.shotgunMessageStartTime = 0; + this.shotgunMessageCount = 1; } } diff --git a/cave/com.raytheon.viz.aviation/src/com/raytheon/viz/aviation/monitor/TafSiteComp.java b/cave/com.raytheon.viz.aviation/src/com/raytheon/viz/aviation/monitor/TafSiteComp.java old mode 100755 new mode 100644 diff --git a/cave/com.raytheon.viz.texteditor/src/com/raytheon/viz/texteditor/dialogs/AWIPSHeaderBlockDlg.java b/cave/com.raytheon.viz.texteditor/src/com/raytheon/viz/texteditor/dialogs/AWIPSHeaderBlockDlg.java index 0d74cb925e..7bf7c1d750 100644 --- a/cave/com.raytheon.viz.texteditor/src/com/raytheon/viz/texteditor/dialogs/AWIPSHeaderBlockDlg.java +++ b/cave/com.raytheon.viz.texteditor/src/com/raytheon/viz/texteditor/dialogs/AWIPSHeaderBlockDlg.java @@ -86,6 +86,7 @@ import com.raytheon.viz.ui.dialogs.CaveSWTDialog; * 06/28/2010 4639 cjeanbap Allow user to create a new text product. * * 01/26/2012 14468 D.Friedman Fix initial BBB field selection. + * 05/30/2012 15046 D.Friedman Always set addressee field to ALL. * * * @author lvenable @@ -448,7 +449,7 @@ public class AWIPSHeaderBlockDlg extends CaveSWTDialog implements addresseeTF.setTextLimit(4); addresseeTF.setLayoutData(rd); // Set the "default" addressee to "ALL". - addresseeTF.setText(parentEditor.getAddressee()); + addresseeTF.setText("ALL"); // When the number of characters enter reaches the max limit and // the caret position is at the end then switch focus to the next @@ -459,16 +460,8 @@ public class AWIPSHeaderBlockDlg extends CaveSWTDialog implements .getTextLimit()) { wmoTtaaiiTF.setFocus(); } - - // If the user changes the text in the addressee text field - // then "untoggle" the toggle buttons. - if (addresseeTF.getText().compareTo("000") != 0 - && addresseeTF.getText().compareTo("DEF") != 0 - && addresseeTF.getText().compareTo("ALL") != 0) { - zerosBtn.setSelection(false); - defBtn.setSelection(false); - allBtn.setSelection(false); - } + + handleAddresseeModified(); } }); @@ -518,6 +511,7 @@ public class AWIPSHeaderBlockDlg extends CaveSWTDialog implements addresseeTF.setText("ALL"); } }); + handleAddresseeModified(); Label sepLbl = new Label(shell, SWT.SEPARATOR | SWT.HORIZONTAL); sepLbl.setLayoutData(new GridData(GridData.FILL_HORIZONTAL)); @@ -945,4 +939,16 @@ public class AWIPSHeaderBlockDlg extends CaveSWTDialog implements } }); } + + private void handleAddresseeModified() { + // If the user changes the text in the addressee text field + // then update the toggle buttons. + String addressee = addresseeTF.getText(); + if (zerosBtn != null) + zerosBtn.setSelection("000".equals(addressee)); + if (defBtn != null) + defBtn.setSelection("DEF".equals(addressee)); + if (allBtn != null) + allBtn.setSelection("ALL".equals(addressee)); + } } diff --git a/cave/com.raytheon.viz.texteditor/src/com/raytheon/viz/texteditor/dialogs/TextEditorDialog.java b/cave/com.raytheon.viz.texteditor/src/com/raytheon/viz/texteditor/dialogs/TextEditorDialog.java old mode 100644 new mode 100755 diff --git a/cave/com.raytheon.viz.warngen/src/com/raytheon/viz/warngen/gui/WarngenLayer.java b/cave/com.raytheon.viz.warngen/src/com/raytheon/viz/warngen/gui/WarngenLayer.java index 6c419e1f50..a122f318d1 100644 --- a/cave/com.raytheon.viz.warngen/src/com/raytheon/viz/warngen/gui/WarngenLayer.java +++ b/cave/com.raytheon.viz.warngen/src/com/raytheon/viz/warngen/gui/WarngenLayer.java @@ -1734,13 +1734,19 @@ public class WarngenLayer extends AbstractStormTrackResource { Matcher m = tmlPtrn.matcher(rawMessage); if (m.find()) { - int day = warnRecord.getIssueTime().get(Calendar.DAY_OF_MONTH); - int hour = Integer.parseInt(m.group(1)); - int minute = Integer.parseInt(m.group(2)); + Calendar issueTime = warnRecord.getIssueTime(); + int day = issueTime.get(Calendar.DAY_OF_MONTH); + int tmlHour = Integer.parseInt(m.group(1)); + // This is for the case when the warning text was created, + // but actually issued the next day. + if (tmlHour > issueTime.get(Calendar.HOUR_OF_DAY)) { + day--; + } + int tmlMinute = Integer.parseInt(m.group(2)); frameTime = Calendar.getInstance(TimeZone.getTimeZone("GMT")); frameTime.set(Calendar.DAY_OF_MONTH, day); - frameTime.set(Calendar.HOUR_OF_DAY, hour); - frameTime.set(Calendar.MINUTE, minute); + frameTime.set(Calendar.HOUR_OF_DAY, tmlHour); + frameTime.set(Calendar.MINUTE, tmlMinute); } else { frameTime = warnRecord.getIssueTime(); } diff --git a/deltaScripts/12.5.1/drop_gfe_tables.sh b/deltaScripts/12.5.1/drop_gfe_tables.sh old mode 100644 new mode 100755 diff --git a/edexOsgi/build.edex/esb/data/utility/common_static/base/warngen/specialMarineWarningFollowup.vm b/edexOsgi/build.edex/esb/data/utility/common_static/base/warngen/specialMarineWarningFollowup.vm old mode 100644 new mode 100755 index 9b712b286a..fb3ca03b3c --- a/edexOsgi/build.edex/esb/data/utility/common_static/base/warngen/specialMarineWarningFollowup.vm +++ b/edexOsgi/build.edex/esb/data/utility/common_static/base/warngen/specialMarineWarningFollowup.vm @@ -43,6 +43,7 @@ #if(${action}!="CANCON") ${WMOId} ${vtecOffice} 000000 ${BBBId} MWS${siteId} + #if(${productClass}=="T") TEST...MARINE WEATHER STATEMENT...TEST #else @@ -478,6 +479,7 @@ REPORT SEVERE WEATHER TO THE COAST GUARD OR NEAREST LAW ENFORCEMENT AGENCY. THEY #if(${action}=="CANCON") ${WMOId} ${vtecOffice} 000000 ${BBBId} MWS${siteId} + #if(${productClass}=="T") TEST...MARINE WEATHER STATEMENT...TEST #else diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/server/lock/LockManager.java b/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/server/lock/LockManager.java index e275cfa16c..4c27b184cd 100644 --- a/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/server/lock/LockManager.java +++ b/edexOsgi/com.raytheon.edex.plugin.gfe/src/com/raytheon/edex/plugin/gfe/server/lock/LockManager.java @@ -450,7 +450,7 @@ public class LockManager { for (int i = 0; i < locks.size() - 1; i++) { currentLock = locks.get(i); nextLock = locks.get(i + 1); - if (currentLock.getEndTime() >= nextLock.getStartTime()) { + if (currentLock.getEndTime() >= nextLock.getStartTime() && currentLock.getWsId().equals(nextLock.getWsId())) { lockCombined = true; deleted.add(currentLock); deleted.add(nextLock); diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/spatial/GribSpatialCache.java b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/spatial/GribSpatialCache.java index e25ca22c79..feac8b8fa9 100644 --- a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/spatial/GribSpatialCache.java +++ b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/spatial/GribSpatialCache.java @@ -1,750 +1,750 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ - -package com.raytheon.edex.plugin.grib.spatial; - -import java.io.File; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.opengis.metadata.spatial.PixelOrientation; - -import com.raytheon.edex.plugin.grib.dao.GribModelDao; -import com.raytheon.edex.plugin.grib.dao.GridCoverageDao; -import com.raytheon.edex.plugin.grib.dao.IGridCoverageDao; -import com.raytheon.edex.site.SiteUtil; -import com.raytheon.uf.common.awipstools.GetWfoCenterPoint; -import com.raytheon.uf.common.dataplugin.grib.exception.GribException; -import com.raytheon.uf.common.dataplugin.grib.spatial.projections.GridCoverage; -import com.raytheon.uf.common.dataplugin.grib.subgrid.SubGrid; -import com.raytheon.uf.common.dataplugin.grib.subgrid.SubGridDef; -import com.raytheon.uf.common.dataplugin.grib.util.GribModelLookup; -import com.raytheon.uf.common.dataplugin.grib.util.GridModel; -import com.raytheon.uf.common.geospatial.MapUtil; -import com.raytheon.uf.common.localization.IPathManager; -import com.raytheon.uf.common.localization.LocalizationContext; -import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel; -import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType; -import com.raytheon.uf.common.localization.LocalizationFile; -import com.raytheon.uf.common.localization.PathManagerFactory; -import com.raytheon.uf.common.serialization.SerializationException; -import com.raytheon.uf.common.serialization.SerializationUtil; -import com.raytheon.uf.edex.awipstools.GetWfoCenterHandler; -import com.raytheon.uf.edex.core.EDEXUtil; -import com.raytheon.uf.edex.database.DataAccessLayerException; -import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; -import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState; -import com.raytheon.uf.edex.database.cluster.ClusterTask; -import com.raytheon.uf.edex.database.dao.CoreDao; -import com.raytheon.uf.edex.database.dao.DaoConfig; -import com.vividsolutions.jts.geom.Coordinate; - -/** - * Cache used for holding GridCoverage objects. Since creating geometries and - * CRS objects are expensive operations, this cache is used to store - * GridCoverages as they are produced. - * - *
- * 
- * SOFTWARE HISTORY
- * 
- * Date         Ticket#     Engineer    Description
- * ------------ ----------  ----------- --------------------------
- * 4/7/09       1994        bphillip    Initial Creation
- * 
- * 
- * - * @author bphillip - * @version 1 - */ -public class GribSpatialCache { - - /** The logger */ - protected transient Log logger = LogFactory.getLog(getClass()); - - /** The singleton instance */ - private static GribSpatialCache instance = new GribSpatialCache(); - - /** - * Map containing the GridCoverages
- * The key for this map is the id field of the GridCoverage object stored as - * the value of the map - */ - private Map spatialMap; - - /** - * Map containing the GridCoverages
- * The key for this map is the name field of the GridCoverage object stored - * as the value of the map. This is only used internally for lookup of a - * coverage by name aka gridId. - */ - private Map spatialNameMap; - - /** - * Map containing the subGrid coverage based on a model name. - */ - private Map subGridCoverageMap; - - /** - * Map containing the subGrid definition based on a model name. - */ - private Map definedSubGridMap; - - /** - * Gets the singleton instance of GribSpatialCache - * - * @return The singleton instance of the GribSpatialCache - */ - public static GribSpatialCache getInstance() { - return instance; - } - - /** - * Creates a new GribSpatialCache - */ - private GribSpatialCache() { - spatialMap = new HashMap(); - spatialNameMap = new HashMap(); - definedSubGridMap = new HashMap(); - subGridCoverageMap = new HashMap(); - initializeGrids(); - } - - /** - * Retrieves a grid from the map. If the grid does not exist, null is - * returned - * - * @param id - * The id of the GridCoverage to retrieve - * @return The GridCoverage object, null if not present - * @throws GribException - * @throws DataAccessLayerException - */ - public GridCoverage getGrid(GridCoverage coverage) throws GribException { - GridCoverage retVal = spatialMap.get(coverage.getId()); - - if (retVal == null) { - /* - * Coverage not found in cache, but the values provided in the GDS - * may be slightly different than those for the grid in the cache. - * Check the database to be sure. - */ - try { - retVal = ((IGridCoverageDao) EDEXUtil.getESBComponent(coverage - .getProjectionType().replaceAll(" ", "") + "Dao")) - .checkGrid(coverage); - } catch (DataAccessLayerException e) { - throw new GribException("Error querying for grib coverage!", e); - } - - if (retVal != null) { - spatialMap.put(coverage.getId(), retVal); - spatialNameMap.put(coverage.getName(), retVal); - } - - } - - return retVal; - } - - public GridCoverage getGrid(int id) { - return spatialMap.get(id); - } - - public GridCoverage getGrid(String modelName) { - GridCoverage rval = null; - - if (modelName != null) { - if (subGridCoverageMap.containsKey(modelName)) { - rval = spatialMap.get(subGridCoverageMap.get(modelName)); - } else { - GridModel model = GribModelLookup.getInstance().getModelByName( - modelName); - if (model != null) { - rval = spatialNameMap.get(model.getGrid().toString()); - } - } - } - - return rval; - } - - public GridCoverage getGridByName(String name) { - return spatialNameMap.get(name); - } - - /** - * Puts a grid into the GribSpatialCache. - * - * @param grid - * The grid to store - * @param persistToDb - * True if this GridCoverage object is to be persisted to the - * database - * @throws GribException - * If problems occur while initializing the grid - */ - public void putGrid(GridCoverage grid, boolean initializeGrid, - boolean persistToDb) throws GribException { - if (initializeGrid) { - /* - * Prepare the grid to be stored into the cache. Initializes the - * geometry and crs objects and generates the id field - */ - grid.initialize(); - if (grid.getName() == null) { - grid.generateName(); - } - } - - // Persist to the database if desired - if (persistToDb) { - new CoreDao(DaoConfig.DEFAULT).saveOrUpdate(grid); - } - - spatialMap.put(grid.getId(), grid); - spatialNameMap.put(grid.getName(), grid); - } - - public SubGrid getSubGrid(String modelName) { - return definedSubGridMap.get(modelName); - } - - public GridCoverage getSubGridCoverage(String modelName) { - GridCoverage rval = null; - - if (subGridCoverageMap.containsKey(modelName)) { - rval = spatialMap.get(subGridCoverageMap.get(modelName)); - } - - return rval; - } - - /** - * Initializes the predefined set of grids. The grids are stored in xml - * format in the utility folder so the localization service has access to - * them.
- * GridCoverage are created from the xml via JaxB and placed in the cache - */ - private void initializeGrids() { - ClusterTask ct = null; - - do { - ct = ClusterLockUtils.lock("grib", "spatialCache", 120000, true); - } while (!LockState.SUCCESSFUL.equals(ct.getLockState())); - - try { - // pull all the coverage from the database - GridCoverageDao dao = new GridCoverageDao(); - FileDataList previousFdl = getPreviousFileDataList(); - FileDataList currentFdl = generateFileDataList(); - - if (isDefintionChanged(previousFdl, currentFdl)) { - processBaseGridsChanged(dao, currentFdl); - saveFileDataList(currentFdl); - } else { - List baseCoverages = dao - .loadBaseGrids(); - - if (baseCoverages != null && baseCoverages.size() > 0) { - for (Object obj : baseCoverages) { - try { - putGrid((GridCoverage) obj, false, false); - } catch (Exception e) { - // Log error but do not throw exception, technically - // is - // only from initialize which isn't being called - logger.error( - "Unable to load grid coverage into cache " - + obj, e); - } - } - } else { - // database wiped/plugin re-initialized need to repopulate - processBaseGridsChanged(dao, currentFdl); - saveFileDataList(currentFdl); - } - } - - processUnknownGrids(dao); - processSubGrids(dao, currentFdl); - } finally { - ClusterLockUtils.unlock(ct, false); - } - } - - /** - * A non subgridded definition has been added, deleted, or changed. - * Changed/delete both delete all records, models, and coverage defintion. - * Then Change/Add put in a new coverage definition. - * - * TODO: Post process Unknown definitions to see if they are now known. If - * now known delete definitions of unknown. - * - * @param dao - * @param currentFdl - */ - private void processBaseGridsChanged(GridCoverageDao dao, - FileDataList currentFdl) { - List baseCoverages = dao.loadBaseGrids(); - Map fileCoverageMap = loadGridDefinitionsFromDisk(currentFdl); - - // update needs to delete all hdf5 same as delete, so update is - // a delete and then an add to simplify logic and handle primary - // key changes. - List coveragesToDelete = new LinkedList(); - HashSet validDbCoverageNames = new HashSet( - (int) (baseCoverages.size() * 1.25) + 1); - - Iterator iter = baseCoverages.iterator(); - while (iter.hasNext()) { - GridCoverage dbCov = iter.next(); - GridCoverage fileCoverage = fileCoverageMap.get(dbCov.getName()); - if (!dbCov.equals(fileCoverage)) { - // coverage not in flat file or coverage has changed, - // delete coverage old coverage - coveragesToDelete.add(dbCov); - iter.remove(); - } else { - // current coverage still valid - validDbCoverageNames.add(dbCov.getName()); - } - } - - // delete grids, models, coverages, and hdf5 for namesToDelete. - for (GridCoverage cov : coveragesToDelete) { - logger.info("GridCoverage " + cov.getName() - + " has changed. Deleting out of date data"); - if (!dao.deleteCoverageAssociatedData(cov, true)) { - logger.warn("Failed to delete GridCoverage " + cov.getName() - + ". Manual intervention required."); - } else { - logger.info("GridCoverage successfully deleted"); - } - } - - // remove the valid db coverages from the map - fileCoverageMap.keySet().removeAll(validDbCoverageNames); - - // add new grids in bulk - for (GridCoverage cov : fileCoverageMap.values()) { - try { - putGrid(cov, true, false); - } catch (Exception e) { - logger.error( - "Failed to initialize grid definition " + cov.getName(), - e); - } - } - - // bulk persist the spatial maps - if (spatialMap.size() > 0) { - dao.persistAll(spatialMap.values()); - } - - for (GridCoverage cov : baseCoverages) { - try { - putGrid(cov, false, false); - } catch (Exception e) { - logger.error( - "Failed to initialize grid definition " + cov.getName(), - e); - } - } - } - - /** - * A non subGridd definition has been added, deleted, or changed. - * Changed/delete both delete all records, models, and coverage defintion. - * Then Change/Add put in a new coverage definition, and also delete any - * data associated with base model definition. - * - * @param dao - * @param currentFdl - */ - private void processSubGrids(GridCoverageDao dao, FileDataList currentFdl) { - List oldSubGridCoverages = dao.loadSubGrids(); - Map fileSubGridCoverageMap = loadSubGridDefinitionsFromDisk(currentFdl); - - // update needs to delete all hdf5 same as delete, so update is - // a delete and then an add to simplify logic and handle primary - // key changes. - List coveragesToDelete = new LinkedList(); - HashSet validDbCoverageNames = new HashSet( - (int) (oldSubGridCoverages.size() * 1.25) + 1); - - Iterator iter = oldSubGridCoverages.iterator(); - while (iter.hasNext()) { - GridCoverage dbCov = iter.next(); - GridCoverage fileCoverage = fileSubGridCoverageMap.get(dbCov - .getName()); - if (!dbCov.equals(fileCoverage)) { - // coverage not in flat file or coverage has changed, - // delete coverage - coveragesToDelete.add(dbCov); - iter.remove(); - } else { - // current coverage still valid - validDbCoverageNames.add(dbCov.getName()); - } - } - - // delete grids, models, coverages, and hdf5 for namesToDelete. - for (GridCoverage cov : coveragesToDelete) { - logger.info("Model " - + cov.getSubGridModel() - + " has changed subGrid definition, deleting out of date data"); - if (!dao.deleteCoverageAssociatedData(cov, true)) { - logger.warn("Failed to delete GridCoverage " + cov.getName() - + ". Manual intervention required."); - } else { - logger.info("GridModel successfully deleted"); - } - } - - // remove the valid db coverages from the map - fileSubGridCoverageMap.keySet().removeAll(validDbCoverageNames); - - // need to delete model information for new adds, as old grid may not - // have been subgridded - GribModelDao modelDao = new GribModelDao(); - for (GridCoverage cov : fileSubGridCoverageMap.values()) { - logger.info("Model " - + cov.getSubGridModel() - + " has changed subGrid definition, deleting out of date data"); - // look up parent - if (modelDao.deleteModelAndAssociatedData(cov.getSubGridModel()) < 0) { - logger.warn("Failed to delete SubGrid Model " - + cov.getSubGridModel() - + ". Manual intervention required."); - } else { - logger.info("GridModel successfully deleted"); - } - } - - // add new grids, persisting individually - for (GridCoverage cov : fileSubGridCoverageMap.values()) { - try { - putGrid(cov, true, true); - subGridCoverageMap.put(cov.getSubGridModel(), cov.getId()); - } catch (Exception e) { - logger.error( - "Failed to initialize grid definition " + cov.getName(), - e); - } - } - - // put database grids into map - for (GridCoverage cov : oldSubGridCoverages) { - try { - putGrid(cov, true, true); - subGridCoverageMap.put(cov.getSubGridModel(), cov.getId()); - } catch (Exception e) { - logger.error( - "Failed to initialize grid definition " + cov.getName(), - e); - } - } - } - - private void processUnknownGrids(GridCoverageDao dao) { - List unknownGrids = dao.loadUnknownGrids(); - for (GridCoverage cov : unknownGrids) { - try { - GridCoverage dbCov = getGrid(cov); - if (!cov.getName().equals(dbCov.getName())) { - logger.info("Unknown grid " + cov.getName() - + " is now mapped by " + dbCov.getName() - + ". Deleting unknown grid"); - dao.deleteCoverageAssociatedData(cov, true); - } - } catch (Exception e) { - logger.error("Erro occurred scanning unknown grids", e); - } - } - } - - private Map loadSubGridDefinitionsFromDisk( - FileDataList currentFdl) { - GribModelLookup gribModelLUT = GribModelLookup.getInstance(); - List subGridDefs = currentFdl.getSubGridFileList(); - Map subGrids = null; - - if (subGridDefs != null && subGridDefs.size() > 0) { - subGrids = new HashMap(subGridDefs.size() * 3); - Coordinate wfoCenterPoint = null; - String wfo = SiteUtil.getSite(); - GetWfoCenterPoint centerPointRequest = new GetWfoCenterPoint(wfo); - try { - wfoCenterPoint = new GetWfoCenterHandler() - .handleRequest(centerPointRequest); - } catch (Exception e) { - logger.error( - "Failed to generate sub grid definitions. Unable to lookup WFO Center Point", - e); - return new HashMap(0); - } - - for (FileData fd : subGridDefs) { - try { - SubGridDef subGridDef = loadSubGridDef(fd.getFilePath()); - - if (subGridDef != null) { - String referenceModel = subGridDef.getReferenceModel(); - - GridCoverage gridCoverage = getGrid(referenceModel); - - if (gridCoverage == null) { - Coordinate wfoCenter = MapUtil - .latLonToGridCoordinate(wfoCenterPoint, - PixelOrientation.CENTER, - gridCoverage); - - double xCenterPoint = wfoCenter.x; - double yCenterPoint = wfoCenter.y; - - double xDistance = subGridDef.getNx() / 2; - double yDistance = subGridDef.getNy() / 2; - Coordinate lowerLeftPosition = new Coordinate( - xCenterPoint - xDistance, yCenterPoint - + yDistance); - Coordinate upperRightPosition = new Coordinate( - xCenterPoint + xDistance, yCenterPoint - - yDistance); - - lowerLeftPosition = MapUtil.gridCoordinateToLatLon( - lowerLeftPosition, PixelOrientation.CENTER, - gridCoverage); - upperRightPosition = MapUtil - .gridCoordinateToLatLon(upperRightPosition, - PixelOrientation.CENTER, - gridCoverage); - - subGridDef.setLowerLeftLon(lowerLeftPosition.x); - subGridDef.setLowerLeftLat(lowerLeftPosition.y); - subGridDef.setUpperRightLon(upperRightPosition.x); - subGridDef.setUpperRightLat(upperRightPosition.y); - - // verify numbers in -180 -> 180 range - subGridDef.setLowerLeftLon(MapUtil - .correctLon(subGridDef.getLowerLeftLon())); - subGridDef.setUpperRightLon(MapUtil - .correctLon(subGridDef.getUpperRightLon())); - - // do a reverse lookup of the model name to get its - // associated grid id - - for (String modelName : subGridDef.getModelNames()) { - GridModel model = gribModelLUT - .getModelByName(modelName); - if (model != null) { - GridCoverage baseCoverage = spatialNameMap - .get(model.getGrid().toString()); - - if (baseCoverage != null) { - SubGrid subGrid = new SubGrid(); - subGrid.setModelName(modelName); - GridCoverage subGridCoverage = baseCoverage - .trim(subGridDef, subGrid); - if (subGridCoverage != null) { - subGrids.put( - subGridCoverage.getName(), - subGridCoverage); - definedSubGridMap.put(modelName, - subGrid); - } - } - } - } - } else { - logger.error("Failed to generate sub grid for " - + fd.getFilePath() - + ". Unable to determine coverage for referenceModel [" - + referenceModel + "]"); - } - } - } catch (Exception e) { - // Log error but do not throw exception - logger.error( - "Failed processing sub grid file: " - + fd.getFilePath(), e); - } - } - } else { - subGrids = new HashMap(0); - } - - return subGrids; - } - - /** - * Loads and validates subGridDef pointed to by filePath. If definition - * empty/invalid returns null. - * - * @param filePath - * @return - */ - private SubGridDef loadSubGridDef(String filePath) { - SubGridDef rval = null; - File f = new File(filePath); - - if (f.length() > 0) { - try { - rval = (SubGridDef) SerializationUtil - .jaxbUnmarshalFromXmlFile(f); - if (rval.getReferenceModel() == null - || rval.getModelNames() == null - || rval.getModelNames().size() == 0) { - // sub grid didn't have required definitions - rval = null; - } - } catch (SerializationException e) { - logger.error("Failed reading sub grid file: " + filePath, e); - } - } - - return rval; - } - - private static boolean isDefintionChanged(FileDataList previousFdl, - FileDataList currentFdl) { - boolean rval = true; - if (currentFdl != null) { - rval = !currentFdl.equals(previousFdl); - } else { - rval = previousFdl != null; - } - - return rval; - } - - private FileDataList generateFileDataList() { - /* - * Retrieve the list of files from the localization service - */ - IPathManager pm = PathManagerFactory.getPathManager(); - FileDataList fileList = new FileDataList(); - LocalizationContext[] contexts = pm - .getLocalSearchHierarchy(LocalizationType.EDEX_STATIC); - fileList.addCoverageFiles(pm.listFiles(contexts, "/grib/grids", - new String[] { "xml" }, true, true)); - fileList.addSubGridFiles(pm.listFiles(contexts, "/grib/subgrids", - new String[] { "xml" }, true, true)); - - return fileList; - } - - private FileDataList getPreviousFileDataList() { - IPathManager pm = PathManagerFactory.getPathManager(); - File previousFileData = pm.getFile(pm.getContext( - LocalizationType.EDEX_STATIC, LocalizationLevel.CONFIGURED), - "/grib/gridDefFileListing.xml"); - FileDataList rval = null; - - if (previousFileData.exists() && previousFileData.length() > 0) { - try { - Object obj = SerializationUtil - .jaxbUnmarshalFromXmlFile(previousFileData); - if (obj instanceof FileDataList) { - rval = (FileDataList) obj; - } else { - logger.error("Error occurred deserializing " - + previousFileData.getAbsolutePath() - + ", expected type " + FileDataList.class - + " received " + obj.getClass()); - } - } catch (Exception e) { - logger.error( - "Error occurred deserializing " - + previousFileData.getAbsolutePath(), e); - } - } - return rval; - } - - private Map loadGridDefinitionsFromDisk( - FileDataList currentFdl) { - List coverageFiles = currentFdl.getCoverageFileList(); - Map fileCoverageMap = new HashMap( - (int) (coverageFiles.size() * 1.25) + 1); - - /* - * Iterate over file list. Unmarshal to GridCoverage object - */ - for (FileData fd : coverageFiles) { - try { - GridCoverage grid = (GridCoverage) SerializationUtil - .jaxbUnmarshalFromXmlFile(fd.getFilePath()); - GridCoverage previousGrid = fileCoverageMap.put(grid.getName(), - grid); - if (previousGrid != null) { - for (FileData fd2 : coverageFiles) { - GridCoverage grid2 = (GridCoverage) SerializationUtil - .jaxbUnmarshalFromXmlFile(fd2.getFilePath()); - if (grid.getName().equals(grid2.getName())) { - logger.error("Grid " + grid.getName() - + " has already been defined. " - + fd.getFilePath() + " and " - + fd2.getFilePath() - + " have same name. Using " - + fd2.getFilePath()); - break; - } - } - } - } catch (Exception e) { - // Log error but do not throw exception - logger.error( - "Unable to read default grids file: " - + fd.getFilePath(), e); - } - } - - return fileCoverageMap; - } - - private void saveFileDataList(FileDataList fdl) { - try { - IPathManager pm = PathManagerFactory.getPathManager(); - LocalizationFile lf = pm.getLocalizationFile( - pm.getContext(LocalizationType.EDEX_STATIC, - LocalizationLevel.CONFIGURED), - "/grib/gridDefFileListing.xml"); - SerializationUtil.jaxbMarshalToXmlFile(fdl, lf.getFile() - .getAbsolutePath()); - lf.save(); - } catch (Exception e) { - logger.error( - "Failed to save coverage file data list, coverages may be reloaded on next restart", - e); - } - } - - public static void reinitialize() { - GribSpatialCache newInstance = new GribSpatialCache(); - instance = newInstance; - } -} +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ + +package com.raytheon.edex.plugin.grib.spatial; + +import java.io.File; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.opengis.metadata.spatial.PixelOrientation; + +import com.raytheon.edex.plugin.grib.dao.GribModelDao; +import com.raytheon.edex.plugin.grib.dao.GridCoverageDao; +import com.raytheon.edex.plugin.grib.dao.IGridCoverageDao; +import com.raytheon.edex.site.SiteUtil; +import com.raytheon.uf.common.awipstools.GetWfoCenterPoint; +import com.raytheon.uf.common.dataplugin.grib.exception.GribException; +import com.raytheon.uf.common.dataplugin.grib.spatial.projections.GridCoverage; +import com.raytheon.uf.common.dataplugin.grib.subgrid.SubGrid; +import com.raytheon.uf.common.dataplugin.grib.subgrid.SubGridDef; +import com.raytheon.uf.common.dataplugin.grib.util.GribModelLookup; +import com.raytheon.uf.common.dataplugin.grib.util.GridModel; +import com.raytheon.uf.common.geospatial.MapUtil; +import com.raytheon.uf.common.localization.IPathManager; +import com.raytheon.uf.common.localization.LocalizationContext; +import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel; +import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType; +import com.raytheon.uf.common.localization.LocalizationFile; +import com.raytheon.uf.common.localization.PathManagerFactory; +import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.serialization.SerializationUtil; +import com.raytheon.uf.edex.awipstools.GetWfoCenterHandler; +import com.raytheon.uf.edex.core.EDEXUtil; +import com.raytheon.uf.edex.database.DataAccessLayerException; +import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; +import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState; +import com.raytheon.uf.edex.database.cluster.ClusterTask; +import com.raytheon.uf.edex.database.dao.CoreDao; +import com.raytheon.uf.edex.database.dao.DaoConfig; +import com.vividsolutions.jts.geom.Coordinate; + +/** + * Cache used for holding GridCoverage objects. Since creating geometries and + * CRS objects are expensive operations, this cache is used to store + * GridCoverages as they are produced. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#     Engineer    Description
+ * ------------ ----------  ----------- --------------------------
+ * 4/7/09       1994        bphillip    Initial Creation
+ * 
+ * 
+ * + * @author bphillip + * @version 1 + */ +public class GribSpatialCache { + + /** The logger */ + protected transient Log logger = LogFactory.getLog(getClass()); + + /** The singleton instance */ + private static GribSpatialCache instance = new GribSpatialCache(); + + /** + * Map containing the GridCoverages
+ * The key for this map is the id field of the GridCoverage object stored as + * the value of the map + */ + private Map spatialMap; + + /** + * Map containing the GridCoverages
+ * The key for this map is the name field of the GridCoverage object stored + * as the value of the map. This is only used internally for lookup of a + * coverage by name aka gridId. + */ + private Map spatialNameMap; + + /** + * Map containing the subGrid coverage based on a model name. + */ + private Map subGridCoverageMap; + + /** + * Map containing the subGrid definition based on a model name. + */ + private Map definedSubGridMap; + + /** + * Gets the singleton instance of GribSpatialCache + * + * @return The singleton instance of the GribSpatialCache + */ + public static GribSpatialCache getInstance() { + return instance; + } + + /** + * Creates a new GribSpatialCache + */ + private GribSpatialCache() { + spatialMap = new HashMap(); + spatialNameMap = new HashMap(); + definedSubGridMap = new HashMap(); + subGridCoverageMap = new HashMap(); + initializeGrids(); + } + + /** + * Retrieves a grid from the map. If the grid does not exist, null is + * returned + * + * @param id + * The id of the GridCoverage to retrieve + * @return The GridCoverage object, null if not present + * @throws GribException + * @throws DataAccessLayerException + */ + public GridCoverage getGrid(GridCoverage coverage) throws GribException { + GridCoverage retVal = spatialMap.get(coverage.getId()); + + if (retVal == null) { + /* + * Coverage not found in cache, but the values provided in the GDS + * may be slightly different than those for the grid in the cache. + * Check the database to be sure. + */ + try { + retVal = ((IGridCoverageDao) EDEXUtil.getESBComponent(coverage + .getProjectionType().replaceAll(" ", "") + "Dao")) + .checkGrid(coverage); + } catch (DataAccessLayerException e) { + throw new GribException("Error querying for grib coverage!", e); + } + + if (retVal != null) { + spatialMap.put(coverage.getId(), retVal); + spatialNameMap.put(coverage.getName(), retVal); + } + + } + + return retVal; + } + + public GridCoverage getGrid(int id) { + return spatialMap.get(id); + } + + public GridCoverage getGrid(String modelName) { + GridCoverage rval = null; + + if (modelName != null) { + if (subGridCoverageMap.containsKey(modelName)) { + rval = spatialMap.get(subGridCoverageMap.get(modelName)); + } else { + GridModel model = GribModelLookup.getInstance().getModelByName( + modelName); + if (model != null) { + rval = spatialNameMap.get(model.getGrid().toString()); + } + } + } + + return rval; + } + + public GridCoverage getGridByName(String name) { + return spatialNameMap.get(name); + } + + /** + * Puts a grid into the GribSpatialCache. + * + * @param grid + * The grid to store + * @param persistToDb + * True if this GridCoverage object is to be persisted to the + * database + * @throws GribException + * If problems occur while initializing the grid + */ + public void putGrid(GridCoverage grid, boolean initializeGrid, + boolean persistToDb) throws GribException { + if (initializeGrid) { + /* + * Prepare the grid to be stored into the cache. Initializes the + * geometry and crs objects and generates the id field + */ + grid.initialize(); + if (grid.getName() == null) { + grid.generateName(); + } + } + + // Persist to the database if desired + if (persistToDb) { + new CoreDao(DaoConfig.DEFAULT).saveOrUpdate(grid); + } + + spatialMap.put(grid.getId(), grid); + spatialNameMap.put(grid.getName(), grid); + } + + public SubGrid getSubGrid(String modelName) { + return definedSubGridMap.get(modelName); + } + + public GridCoverage getSubGridCoverage(String modelName) { + GridCoverage rval = null; + + if (subGridCoverageMap.containsKey(modelName)) { + rval = spatialMap.get(subGridCoverageMap.get(modelName)); + } + + return rval; + } + + /** + * Initializes the predefined set of grids. The grids are stored in xml + * format in the utility folder so the localization service has access to + * them.
+ * GridCoverage are created from the xml via JaxB and placed in the cache + */ + private void initializeGrids() { + ClusterTask ct = null; + + do { + ct = ClusterLockUtils.lock("grib", "spatialCache", 120000, true); + } while (!LockState.SUCCESSFUL.equals(ct.getLockState())); + + try { + // pull all the coverage from the database + GridCoverageDao dao = new GridCoverageDao(); + FileDataList previousFdl = getPreviousFileDataList(); + FileDataList currentFdl = generateFileDataList(); + + if (isDefintionChanged(previousFdl, currentFdl)) { + processBaseGridsChanged(dao, currentFdl); + saveFileDataList(currentFdl); + } else { + List baseCoverages = dao + .loadBaseGrids(); + + if (baseCoverages != null && baseCoverages.size() > 0) { + for (Object obj : baseCoverages) { + try { + putGrid((GridCoverage) obj, false, false); + } catch (Exception e) { + // Log error but do not throw exception, technically + // is + // only from initialize which isn't being called + logger.error( + "Unable to load grid coverage into cache " + + obj, e); + } + } + } else { + // database wiped/plugin re-initialized need to repopulate + processBaseGridsChanged(dao, currentFdl); + saveFileDataList(currentFdl); + } + } + + processUnknownGrids(dao); + processSubGrids(dao, currentFdl); + } finally { + ClusterLockUtils.unlock(ct, false); + } + } + + /** + * A non subgridded definition has been added, deleted, or changed. + * Changed/delete both delete all records, models, and coverage defintion. + * Then Change/Add put in a new coverage definition. + * + * TODO: Post process Unknown definitions to see if they are now known. If + * now known delete definitions of unknown. + * + * @param dao + * @param currentFdl + */ + private void processBaseGridsChanged(GridCoverageDao dao, + FileDataList currentFdl) { + List baseCoverages = dao.loadBaseGrids(); + Map fileCoverageMap = loadGridDefinitionsFromDisk(currentFdl); + + // update needs to delete all hdf5 same as delete, so update is + // a delete and then an add to simplify logic and handle primary + // key changes. + List coveragesToDelete = new LinkedList(); + HashSet validDbCoverageNames = new HashSet( + (int) (baseCoverages.size() * 1.25) + 1); + + Iterator iter = baseCoverages.iterator(); + while (iter.hasNext()) { + GridCoverage dbCov = iter.next(); + GridCoverage fileCoverage = fileCoverageMap.get(dbCov.getName()); + if (!dbCov.equals(fileCoverage)) { + // coverage not in flat file or coverage has changed, + // delete coverage old coverage + coveragesToDelete.add(dbCov); + iter.remove(); + } else { + // current coverage still valid + validDbCoverageNames.add(dbCov.getName()); + } + } + + // delete grids, models, coverages, and hdf5 for namesToDelete. + for (GridCoverage cov : coveragesToDelete) { + logger.info("GridCoverage " + cov.getName() + + " has changed. Deleting out of date data"); + if (!dao.deleteCoverageAssociatedData(cov, true)) { + logger.warn("Failed to delete GridCoverage " + cov.getName() + + ". Manual intervention required."); + } else { + logger.info("GridCoverage successfully deleted"); + } + } + + // remove the valid db coverages from the map + fileCoverageMap.keySet().removeAll(validDbCoverageNames); + + // add new grids in bulk + for (GridCoverage cov : fileCoverageMap.values()) { + try { + putGrid(cov, true, false); + } catch (Exception e) { + logger.error( + "Failed to initialize grid definition " + cov.getName(), + e); + } + } + + // bulk persist the spatial maps + if (spatialMap.size() > 0) { + dao.persistAll(spatialMap.values()); + } + + for (GridCoverage cov : baseCoverages) { + try { + putGrid(cov, false, false); + } catch (Exception e) { + logger.error( + "Failed to initialize grid definition " + cov.getName(), + e); + } + } + } + + /** + * A non subGridd definition has been added, deleted, or changed. + * Changed/delete both delete all records, models, and coverage defintion. + * Then Change/Add put in a new coverage definition, and also delete any + * data associated with base model definition. + * + * @param dao + * @param currentFdl + */ + private void processSubGrids(GridCoverageDao dao, FileDataList currentFdl) { + List oldSubGridCoverages = dao.loadSubGrids(); + Map fileSubGridCoverageMap = loadSubGridDefinitionsFromDisk(currentFdl); + + // update needs to delete all hdf5 same as delete, so update is + // a delete and then an add to simplify logic and handle primary + // key changes. + List coveragesToDelete = new LinkedList(); + HashSet validDbCoverageNames = new HashSet( + (int) (oldSubGridCoverages.size() * 1.25) + 1); + + Iterator iter = oldSubGridCoverages.iterator(); + while (iter.hasNext()) { + GridCoverage dbCov = iter.next(); + GridCoverage fileCoverage = fileSubGridCoverageMap.get(dbCov + .getName()); + if (!dbCov.equals(fileCoverage)) { + // coverage not in flat file or coverage has changed, + // delete coverage + coveragesToDelete.add(dbCov); + iter.remove(); + } else { + // current coverage still valid + validDbCoverageNames.add(dbCov.getName()); + } + } + + // delete grids, models, coverages, and hdf5 for namesToDelete. + for (GridCoverage cov : coveragesToDelete) { + logger.info("Model " + + cov.getSubGridModel() + + " has changed subGrid definition, deleting out of date data"); + if (!dao.deleteCoverageAssociatedData(cov, true)) { + logger.warn("Failed to delete GridCoverage " + cov.getName() + + ". Manual intervention required."); + } else { + logger.info("GridModel successfully deleted"); + } + } + + // remove the valid db coverages from the map + fileSubGridCoverageMap.keySet().removeAll(validDbCoverageNames); + + // need to delete model information for new adds, as old grid may not + // have been subgridded + GribModelDao modelDao = new GribModelDao(); + for (GridCoverage cov : fileSubGridCoverageMap.values()) { + logger.info("Model " + + cov.getSubGridModel() + + " has changed subGrid definition, deleting out of date data"); + // look up parent + if (modelDao.deleteModelAndAssociatedData(cov.getSubGridModel()) < 0) { + logger.warn("Failed to delete SubGrid Model " + + cov.getSubGridModel() + + ". Manual intervention required."); + } else { + logger.info("GridModel successfully deleted"); + } + } + + // add new grids, persisting individually + for (GridCoverage cov : fileSubGridCoverageMap.values()) { + try { + putGrid(cov, true, true); + subGridCoverageMap.put(cov.getSubGridModel(), cov.getId()); + } catch (Exception e) { + logger.error( + "Failed to initialize grid definition " + cov.getName(), + e); + } + } + + // put database grids into map + for (GridCoverage cov : oldSubGridCoverages) { + try { + putGrid(cov, true, true); + subGridCoverageMap.put(cov.getSubGridModel(), cov.getId()); + } catch (Exception e) { + logger.error( + "Failed to initialize grid definition " + cov.getName(), + e); + } + } + } + + private void processUnknownGrids(GridCoverageDao dao) { + List unknownGrids = dao.loadUnknownGrids(); + for (GridCoverage cov : unknownGrids) { + try { + GridCoverage dbCov = getGrid(cov); + if (!cov.getName().equals(dbCov.getName())) { + logger.info("Unknown grid " + cov.getName() + + " is now mapped by " + dbCov.getName() + + ". Deleting unknown grid"); + dao.deleteCoverageAssociatedData(cov, true); + } + } catch (Exception e) { + logger.error("Erro occurred scanning unknown grids", e); + } + } + } + + private Map loadSubGridDefinitionsFromDisk( + FileDataList currentFdl) { + GribModelLookup gribModelLUT = GribModelLookup.getInstance(); + List subGridDefs = currentFdl.getSubGridFileList(); + Map subGrids = null; + + if (subGridDefs != null && subGridDefs.size() > 0) { + subGrids = new HashMap(subGridDefs.size() * 3); + Coordinate wfoCenterPoint = null; + String wfo = SiteUtil.getSite(); + GetWfoCenterPoint centerPointRequest = new GetWfoCenterPoint(wfo); + try { + wfoCenterPoint = new GetWfoCenterHandler() + .handleRequest(centerPointRequest); + } catch (Exception e) { + logger.error( + "Failed to generate sub grid definitions. Unable to lookup WFO Center Point", + e); + return new HashMap(0); + } + + for (FileData fd : subGridDefs) { + try { + SubGridDef subGridDef = loadSubGridDef(fd.getFilePath()); + + if (subGridDef != null) { + String referenceModel = subGridDef.getReferenceModel(); + + GridCoverage gridCoverage = getGrid(referenceModel); + + if (gridCoverage != null) { + Coordinate wfoCenter = MapUtil + .latLonToGridCoordinate(wfoCenterPoint, + PixelOrientation.CENTER, + gridCoverage); + + double xCenterPoint = wfoCenter.x; + double yCenterPoint = wfoCenter.y; + + double xDistance = subGridDef.getNx() / 2; + double yDistance = subGridDef.getNy() / 2; + Coordinate lowerLeftPosition = new Coordinate( + xCenterPoint - xDistance, yCenterPoint + + yDistance); + Coordinate upperRightPosition = new Coordinate( + xCenterPoint + xDistance, yCenterPoint + - yDistance); + + lowerLeftPosition = MapUtil.gridCoordinateToLatLon( + lowerLeftPosition, PixelOrientation.CENTER, + gridCoverage); + upperRightPosition = MapUtil + .gridCoordinateToLatLon(upperRightPosition, + PixelOrientation.CENTER, + gridCoverage); + + subGridDef.setLowerLeftLon(lowerLeftPosition.x); + subGridDef.setLowerLeftLat(lowerLeftPosition.y); + subGridDef.setUpperRightLon(upperRightPosition.x); + subGridDef.setUpperRightLat(upperRightPosition.y); + + // verify numbers in -180 -> 180 range + subGridDef.setLowerLeftLon(MapUtil + .correctLon(subGridDef.getLowerLeftLon())); + subGridDef.setUpperRightLon(MapUtil + .correctLon(subGridDef.getUpperRightLon())); + + // do a reverse lookup of the model name to get its + // associated grid id + + for (String modelName : subGridDef.getModelNames()) { + GridModel model = gribModelLUT + .getModelByName(modelName); + if (model != null) { + GridCoverage baseCoverage = spatialNameMap + .get(model.getGrid().toString()); + + if (baseCoverage != null) { + SubGrid subGrid = new SubGrid(); + subGrid.setModelName(modelName); + GridCoverage subGridCoverage = baseCoverage + .trim(subGridDef, subGrid); + if (subGridCoverage != null) { + subGrids.put( + subGridCoverage.getName(), + subGridCoverage); + definedSubGridMap.put(modelName, + subGrid); + } + } + } + } + } else { + logger.error("Failed to generate sub grid for " + + fd.getFilePath() + + ". Unable to determine coverage for referenceModel [" + + referenceModel + "]"); + } + } + } catch (Exception e) { + // Log error but do not throw exception + logger.error( + "Failed processing sub grid file: " + + fd.getFilePath(), e); + } + } + } else { + subGrids = new HashMap(0); + } + + return subGrids; + } + + /** + * Loads and validates subGridDef pointed to by filePath. If definition + * empty/invalid returns null. + * + * @param filePath + * @return + */ + private SubGridDef loadSubGridDef(String filePath) { + SubGridDef rval = null; + File f = new File(filePath); + + if (f.length() > 0) { + try { + rval = (SubGridDef) SerializationUtil + .jaxbUnmarshalFromXmlFile(f); + if (rval.getReferenceModel() == null + || rval.getModelNames() == null + || rval.getModelNames().size() == 0) { + // sub grid didn't have required definitions + rval = null; + } + } catch (SerializationException e) { + logger.error("Failed reading sub grid file: " + filePath, e); + } + } + + return rval; + } + + private static boolean isDefintionChanged(FileDataList previousFdl, + FileDataList currentFdl) { + boolean rval = true; + if (currentFdl != null) { + rval = !currentFdl.equals(previousFdl); + } else { + rval = previousFdl != null; + } + + return rval; + } + + private FileDataList generateFileDataList() { + /* + * Retrieve the list of files from the localization service + */ + IPathManager pm = PathManagerFactory.getPathManager(); + FileDataList fileList = new FileDataList(); + LocalizationContext[] contexts = pm + .getLocalSearchHierarchy(LocalizationType.EDEX_STATIC); + fileList.addCoverageFiles(pm.listFiles(contexts, "/grib/grids", + new String[] { "xml" }, true, true)); + fileList.addSubGridFiles(pm.listFiles(contexts, "/grib/subgrids", + new String[] { "xml" }, true, true)); + + return fileList; + } + + private FileDataList getPreviousFileDataList() { + IPathManager pm = PathManagerFactory.getPathManager(); + File previousFileData = pm.getFile(pm.getContext( + LocalizationType.EDEX_STATIC, LocalizationLevel.CONFIGURED), + "/grib/gridDefFileListing.xml"); + FileDataList rval = null; + + if (previousFileData.exists() && previousFileData.length() > 0) { + try { + Object obj = SerializationUtil + .jaxbUnmarshalFromXmlFile(previousFileData); + if (obj instanceof FileDataList) { + rval = (FileDataList) obj; + } else { + logger.error("Error occurred deserializing " + + previousFileData.getAbsolutePath() + + ", expected type " + FileDataList.class + + " received " + obj.getClass()); + } + } catch (Exception e) { + logger.error( + "Error occurred deserializing " + + previousFileData.getAbsolutePath(), e); + } + } + return rval; + } + + private Map loadGridDefinitionsFromDisk( + FileDataList currentFdl) { + List coverageFiles = currentFdl.getCoverageFileList(); + Map fileCoverageMap = new HashMap( + (int) (coverageFiles.size() * 1.25) + 1); + + /* + * Iterate over file list. Unmarshal to GridCoverage object + */ + for (FileData fd : coverageFiles) { + try { + GridCoverage grid = (GridCoverage) SerializationUtil + .jaxbUnmarshalFromXmlFile(fd.getFilePath()); + GridCoverage previousGrid = fileCoverageMap.put(grid.getName(), + grid); + if (previousGrid != null) { + for (FileData fd2 : coverageFiles) { + GridCoverage grid2 = (GridCoverage) SerializationUtil + .jaxbUnmarshalFromXmlFile(fd2.getFilePath()); + if (grid.getName().equals(grid2.getName())) { + logger.error("Grid " + grid.getName() + + " has already been defined. " + + fd.getFilePath() + " and " + + fd2.getFilePath() + + " have same name. Using " + + fd2.getFilePath()); + break; + } + } + } + } catch (Exception e) { + // Log error but do not throw exception + logger.error( + "Unable to read default grids file: " + + fd.getFilePath(), e); + } + } + + return fileCoverageMap; + } + + private void saveFileDataList(FileDataList fdl) { + try { + IPathManager pm = PathManagerFactory.getPathManager(); + LocalizationFile lf = pm.getLocalizationFile( + pm.getContext(LocalizationType.EDEX_STATIC, + LocalizationLevel.CONFIGURED), + "/grib/gridDefFileListing.xml"); + SerializationUtil.jaxbMarshalToXmlFile(fdl, lf.getFile() + .getAbsolutePath()); + lf.save(); + } catch (Exception e) { + logger.error( + "Failed to save coverage file data list, coverages may be reloaded on next restart", + e); + } + } + + public static void reinitialize() { + GribSpatialCache newInstance = new GribSpatialCache(); + instance = newInstance; + } +} diff --git a/edexOsgi/com.raytheon.edex.plugin.text/src/com/raytheon/edex/plugin/text/dao/TextDao.java b/edexOsgi/com.raytheon.edex.plugin.text/src/com/raytheon/edex/plugin/text/dao/TextDao.java index ee6fd4319e..069873d95d 100644 --- a/edexOsgi/com.raytheon.edex.plugin.text/src/com/raytheon/edex/plugin/text/dao/TextDao.java +++ b/edexOsgi/com.raytheon.edex.plugin.text/src/com/raytheon/edex/plugin/text/dao/TextDao.java @@ -1,63 +1,72 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ -package com.raytheon.edex.plugin.text.dao; - -import com.raytheon.edex.db.dao.DefaultPluginDao; -import com.raytheon.edex.textdb.dbapi.impl.TextDB; -import com.raytheon.uf.common.dataplugin.PluginException; -import com.raytheon.uf.edex.database.purge.PurgeLogger; - -/** - * DAO for text products - * - *
- * 
- * SOFTWARE HISTORY
- * 
- * Date       	Ticket#		Engineer	Description
- * ------------	----------	-----------	--------------------------
- * Jul 10, 2009 2191        rjpeter     Update retention time handling.
- * Aug 18, 2009 2191        rjpeter     Changed to version purging.
- * 
- * - * @author - * @version 1 - */ -public class TextDao extends DefaultPluginDao { - - public TextDao(String pluginName) throws PluginException { - super(pluginName); - } - - @Override - public void purgeAllData() { - logger.warn("purgeAllPluginData not implemented for text. No data will be purged."); - } - - protected void loadScripts() throws PluginException { - // no op - } - - public void purgeExpiredData() throws PluginException { - int deletedRecords = TextDB.purgeStdTextProducts(); - PurgeLogger.logInfo("Purged " + deletedRecords + " items total.", - "text"); - } -} +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.edex.plugin.text.dao; + +import java.util.Calendar; + +import com.raytheon.edex.db.dao.DefaultPluginDao; +import com.raytheon.edex.textdb.dbapi.impl.TextDB; +import com.raytheon.uf.common.dataplugin.PluginException; +import com.raytheon.uf.edex.database.purge.PurgeLogger; + +/** + * DAO for text products + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date       	Ticket#		Engineer	Description
+ * ------------	----------	-----------	--------------------------
+ * Jul 10, 2009 2191        rjpeter     Update retention time handling.
+ * Aug 18, 2009 2191        rjpeter     Changed to version purging.
+ * 
+ * + * @author + * @version 1 + */ +public class TextDao extends DefaultPluginDao { + + public TextDao(String pluginName) throws PluginException { + super(pluginName); + } + + @Override + public void purgeAllData() { + logger.warn("purgeAllPluginData not implemented for text. No data will be purged."); + } + + protected void loadScripts() throws PluginException { + // no op + } + + public void purgeExpiredData() throws PluginException { + int deletedRecords = 0; + + // only do full purge every few hours since incremental purge runs every + // minute + if (Calendar.getInstance().get(Calendar.HOUR_OF_DAY) % 3 == 0) { + TextDB.purgeStdTextProducts(); + } + + PurgeLogger.logInfo("Purged " + deletedRecords + " items total.", + "text"); + } +} diff --git a/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeDao.java b/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeDao.java index 7d2a7518bf..492c27e9c7 100644 --- a/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeDao.java +++ b/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeDao.java @@ -1,284 +1,282 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ -package com.raytheon.uf.edex.purgesrv; - -import java.sql.Timestamp; -import java.util.Calendar; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; - -import org.hibernate.Query; -import org.hibernate.Session; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; - -import com.raytheon.uf.edex.database.dao.CoreDao; -import com.raytheon.uf.edex.database.dao.DaoConfig; - -/** - * - * Data access object for accessing purge job status objects - * - *
- * 
- * SOFTWARE HISTORY
- * 
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * May 1, 2012  #470      bphillip     Initial creation
- * 
- * 
- * - * @author bphillip - * @version 1.0 - */ -public class PurgeDao extends CoreDao { - - /** - * Constructs a new purge data access object - */ - public PurgeDao() { - super(DaoConfig.forClass(PurgeJobStatus.class)); - } - - /** - * Gets the number of purge jobs currently running on the cluster. A job is - * considered running if the 'running' flag is set to true and the job has - * been started since validStartTime and has not met or exceeded the failed - * count. - * - * @param validStartTime - * @param failedCount - * @return The number of purge jobs currently running on the cluster - */ - public int getRunningClusterJobs(final Date validStartTime, - final int failedCount) { - final String query = "from " - + daoClass.getName() - + " obj where obj.running = true and obj.startTime > :startTime and obj.failedCount <= :failedCount"; - return (Integer) txTemplate.execute(new TransactionCallback() { - @Override - public Object doInTransaction(TransactionStatus status) { - Query hibQuery = getSession(false).createQuery(query); - hibQuery.setTimestamp("startTime", validStartTime); - hibQuery.setInteger("failedCount", failedCount); - List queryResult = hibQuery.list(); - if (queryResult == null) { - return 0; - } else { - return queryResult.size(); - } - } - }); - } - - /** - * Returns the jobs that have met or exceed the failed count. - * - * @param failedCount - * @return - */ - @SuppressWarnings("unchecked") - public List getFailedJobs(final int failedCount) { - final String query = "from " + daoClass.getName() - + " obj where obj.failedCount >= :failedCount"; - return (List) txTemplate - .execute(new TransactionCallback() { - @Override - public List doInTransaction( - TransactionStatus status) { - Query hibQuery = getSession(false).createQuery(query); - hibQuery.setInteger("failedCount", failedCount); - return hibQuery.list(); - } - }); - } - - @SuppressWarnings("unchecked") - public List getTimedOutJobs(final Date validStartTime) { - final String query = "from " - + daoClass.getName() - + " obj where obj.running = true and obj.startTime <= :startTime"; - return (List) txTemplate - .execute(new TransactionCallback() { - @Override - public List doInTransaction( - TransactionStatus status) { - Query hibQuery = getSession(false).createQuery(query); - hibQuery.setTimestamp("startTime", validStartTime); - return hibQuery.list(); - } - }); - } - - @SuppressWarnings("unchecked") - public Map> getRunningServerJobs() { - final String query = "from " - + daoClass.getName() - + " obj where obj.running = true and obj.timedOut = false and obj.failed = false and obj.id.server=':SERVER'"; - return (Map>) txTemplate - .execute(new TransactionCallback() { - @Override - public Map> doInTransaction( - TransactionStatus status) { - Map> serverMap = new HashMap>(); - Query serverQuery = getSession(false).createQuery( - "select distinct obj.id.server from " - + daoClass.getName() - + " obj order by obj.id.server asc"); - List result = serverQuery.list(); - for (String server : result) { - Query query2 = getSession(false).createQuery( - query.replace(":SERVER", server)); - serverMap.put(server, query2.list()); - } - return serverMap; - } - }); - } - - /** - * Gets the amount of time in milliseconds since the last purge of a given - * plugin - * - * @param plugin - * The plugin name - * @return Number of milliseconds since the purge job was run for the given - * plugin - */ - public long getTimeSinceLastPurge(String plugin) { - final String query = "select obj.startTime from " + daoClass.getName() - + " obj where obj.id.plugin='" + plugin + "'"; - return (Long) txTemplate.execute(new TransactionCallback() { - @Override - public Object doInTransaction(TransactionStatus status) { - Query hibQuery = getSession(false).createQuery(query); - Timestamp queryResult = (Timestamp) hibQuery.uniqueResult(); - if (queryResult == null) { - return -1; - } else { - return System.currentTimeMillis() - queryResult.getTime(); - } - } - }); - } - - /** - * Gets the purge job status for a plugin - * - * @param plugin - * The plugin to get the purge job status for - * @return The purge job statuses - */ - public PurgeJobStatus getJobForPlugin(String plugin) { - final String query = "from " + daoClass.getName() - + " obj where obj.id.plugin='" + plugin + "'"; - return (PurgeJobStatus) txTemplate.execute(new TransactionCallback() { - @Override - public PurgeJobStatus doInTransaction(TransactionStatus status) { - Query hibQuery = getSession(false).createQuery(query); - PurgeJobStatus queryResult = (PurgeJobStatus) hibQuery - .uniqueResult(); - return queryResult; - } - }); - } - - /** - * Sets a purge job to running status and sets the startTime to current - * time. If was previously running, will increment the failed count. - * - * @param plugin - * The plugin row to update - */ - public void startJob(final String plugin) { - final String query = "from " + daoClass.getName() - + " obj where obj.id.plugin='" + plugin + "'"; - txTemplate.execute(new TransactionCallback() { - @Override - public PurgeJobStatus doInTransaction(TransactionStatus status) { - Session sess = getSession(false); - Query hibQuery = sess.createQuery(query); - PurgeJobStatus queryResult = (PurgeJobStatus) hibQuery - .uniqueResult(); - if (queryResult == null) { - queryResult = new PurgeJobStatus(); - queryResult.setFailedCount(0); - queryResult.setPlugin(plugin); - queryResult.setRunning(false); - sess.save(queryResult); - } - - // any changes to PurgeJobStatus will be commited at end of - // transaction - if (queryResult.isRunning()) { - // query was previously running, update failed count - queryResult.incrementFailedCount(); - } - - queryResult.setStartTime(Calendar.getInstance( - TimeZone.getTimeZone("GMT")).getTime()); - queryResult.setRunning(true); - - return queryResult; - } - }); - } - - /** - * Retrieves the plugins order by startTime. - * - * @param latestStartTime - * @param failedCount - * @return - */ - @SuppressWarnings("unchecked") - public List getPluginsByPurgeTime() { - final String query = "select obj.id.plugin from " + daoClass.getName() - + " obj order by obj.startTime asc, obj.plugin asc"; - return (List) txTemplate.execute(new TransactionCallback() { - @Override - public List doInTransaction(TransactionStatus status) { - Query hibQuery = getSession(false).createQuery(query); - List result = (List) hibQuery.list(); - return result; - } - }); - } - - /** - * Updates a purge job status object - * - * @param jobStatus - * The object to update - */ - public void update(final PurgeJobStatus jobStatus) { - txTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - public void doInTransactionWithoutResult(TransactionStatus status) { - getHibernateTemplate().update(jobStatus); - } - }); - } -} +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.purgesrv; + +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import org.hibernate.Query; +import org.hibernate.Session; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; + +import com.raytheon.uf.edex.database.dao.CoreDao; +import com.raytheon.uf.edex.database.dao.DaoConfig; + +/** + * + * Data access object for accessing purge job status objects + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * May 1, 2012  #470      bphillip     Initial creation
+ * 
+ * 
+ * + * @author bphillip + * @version 1.0 + */ +public class PurgeDao extends CoreDao { + + /** + * Constructs a new purge data access object + */ + public PurgeDao() { + super(DaoConfig.forClass(PurgeJobStatus.class)); + } + + /** + * Gets the number of purge jobs currently running on the cluster. A job is + * considered running if the 'running' flag is set to true and the job has + * been started since validStartTime and has not met or exceeded the failed + * count. + * + * @param validStartTime + * @param failedCount + * @return The number of purge jobs currently running on the cluster + */ + public int getRunningClusterJobs(final Date validStartTime, + final int failedCount) { + final String query = "from " + + daoClass.getName() + + " obj where obj.running = true and obj.startTime > :startTime and obj.failedCount <= :failedCount"; + return (Integer) txTemplate.execute(new TransactionCallback() { + @Override + public Object doInTransaction(TransactionStatus status) { + Query hibQuery = getSession(false).createQuery(query); + hibQuery.setTimestamp("startTime", validStartTime); + hibQuery.setInteger("failedCount", failedCount); + List queryResult = hibQuery.list(); + if (queryResult == null) { + return 0; + } else { + return queryResult.size(); + } + } + }); + } + + /** + * Returns the jobs that have met or exceed the failed count. + * + * @param failedCount + * @return + */ + @SuppressWarnings("unchecked") + public List getFailedJobs(final int failedCount) { + final String query = "from " + daoClass.getName() + + " obj where obj.failedCount >= :failedCount"; + return (List) txTemplate + .execute(new TransactionCallback() { + @Override + public List doInTransaction( + TransactionStatus status) { + Query hibQuery = getSession(false).createQuery(query); + hibQuery.setInteger("failedCount", failedCount); + return hibQuery.list(); + } + }); + } + + @SuppressWarnings("unchecked") + public List getTimedOutJobs(final Date validStartTime) { + final String query = "from " + + daoClass.getName() + + " obj where obj.running = true and obj.startTime <= :startTime"; + return (List) txTemplate + .execute(new TransactionCallback() { + @Override + public List doInTransaction( + TransactionStatus status) { + Query hibQuery = getSession(false).createQuery(query); + hibQuery.setTimestamp("startTime", validStartTime); + return hibQuery.list(); + } + }); + } + + @SuppressWarnings("unchecked") + public Map> getRunningServerJobs() { + final String query = "from " + + daoClass.getName() + + " obj where obj.running = true and obj.timedOut = false and obj.failed = false and obj.id.server=':SERVER'"; + return (Map>) txTemplate + .execute(new TransactionCallback() { + @Override + public Map> doInTransaction( + TransactionStatus status) { + Map> serverMap = new HashMap>(); + Query serverQuery = getSession(false).createQuery( + "select distinct obj.id.server from " + + daoClass.getName() + + " obj order by obj.id.server asc"); + List result = serverQuery.list(); + for (String server : result) { + Query query2 = getSession(false).createQuery( + query.replace(":SERVER", server)); + serverMap.put(server, query2.list()); + } + return serverMap; + } + }); + } + + /** + * Gets the amount of time in milliseconds since the last purge of a given + * plugin + * + * @param plugin + * The plugin name + * @return Number of milliseconds since the purge job was run for the given + * plugin + */ + public long getTimeSinceLastPurge(String plugin) { + final String query = "select obj.startTime from " + daoClass.getName() + + " obj where obj.id.plugin='" + plugin + "'"; + return (Long) txTemplate.execute(new TransactionCallback() { + @Override + public Object doInTransaction(TransactionStatus status) { + Query hibQuery = getSession(false).createQuery(query); + Timestamp queryResult = (Timestamp) hibQuery.uniqueResult(); + if (queryResult == null) { + return -1; + } else { + return System.currentTimeMillis() - queryResult.getTime(); + } + } + }); + } + + /** + * Gets the purge job status for a plugin + * + * @param plugin + * The plugin to get the purge job status for + * @return The purge job statuses + */ + public PurgeJobStatus getJobForPlugin(String plugin) { + final String query = "from " + daoClass.getName() + + " obj where obj.id.plugin='" + plugin + "'"; + return (PurgeJobStatus) txTemplate.execute(new TransactionCallback() { + @Override + public PurgeJobStatus doInTransaction(TransactionStatus status) { + Query hibQuery = getSession(false).createQuery(query); + PurgeJobStatus queryResult = (PurgeJobStatus) hibQuery + .uniqueResult(); + return queryResult; + } + }); + } + + /** + * Sets a purge job to running status and sets the startTime to current + * time. If was previously running, will increment the failed count. + * + * @param plugin + * The plugin row to update + */ + public void startJob(final String plugin) { + final String query = "from " + daoClass.getName() + + " obj where obj.id.plugin='" + plugin + "'"; + txTemplate.execute(new TransactionCallback() { + @Override + public PurgeJobStatus doInTransaction(TransactionStatus status) { + Session sess = getSession(false); + Query hibQuery = sess.createQuery(query); + PurgeJobStatus queryResult = (PurgeJobStatus) hibQuery + .uniqueResult(); + if (queryResult == null) { + queryResult = new PurgeJobStatus(); + queryResult.setFailedCount(0); + queryResult.setPlugin(plugin); + queryResult.setRunning(false); + sess.save(queryResult); + } + + if (queryResult.isRunning()) { + // query was previously running, update failed count + queryResult.incrementFailedCount(); + } + + queryResult.setStartTime(Calendar.getInstance( + TimeZone.getTimeZone("GMT")).getTime()); + queryResult.setRunning(true); + sess.update(queryResult); + return queryResult; + } + }); + } + + /** + * Retrieves the plugins order by startTime. + * + * @param latestStartTime + * @param failedCount + * @return + */ + @SuppressWarnings("unchecked") + public List getPluginsByPurgeTime() { + final String query = "select obj.id.plugin from " + daoClass.getName() + + " obj order by obj.startTime asc, obj.plugin asc"; + return (List) txTemplate.execute(new TransactionCallback() { + @Override + public List doInTransaction(TransactionStatus status) { + Query hibQuery = getSession(false).createQuery(query); + List result = (List) hibQuery.list(); + return result; + } + }); + } + + /** + * Updates a purge job status object + * + * @param jobStatus + * The object to update + */ + public void update(final PurgeJobStatus jobStatus) { + txTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + getHibernateTemplate().update(jobStatus); + } + }); + } +} diff --git a/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeJob.java b/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeJob.java index 7b260d5eb8..d71ca475d1 100644 --- a/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeJob.java +++ b/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeJob.java @@ -1,299 +1,302 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ -package com.raytheon.uf.edex.purgesrv; - -import java.lang.reflect.Method; -import java.sql.SQLException; -import java.util.Calendar; -import java.util.Date; -import java.util.Map; -import java.util.TimeZone; - -import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; -import com.raytheon.uf.edex.database.cluster.ClusterTask; -import com.raytheon.uf.edex.database.plugin.PluginDao; -import com.raytheon.uf.edex.database.plugin.PluginFactory; -import com.raytheon.uf.edex.database.purge.PurgeLogger; - -/** - * - * This class encapsulates the purge activity for a plugin into a cluster task. - * - *
- * 
- * SOFTWARE HISTORY
- * 
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Apr 19, 2012 #470       bphillip     Initial creation
- * 
- * 
- * - * @author bphillip - * @version 1.0 - */ -public class PurgeJob extends Thread { - - /** The type of purge */ - public enum PURGE_JOB_TYPE { - PURGE_ALL, PURGE_EXPIRED - } - - private long startTime; - - /** The cluster task name to use for purge jobs */ - public static final String TASK_NAME = "Purge Plugin Data"; - - /** The plugin associated with this purge job */ - private String pluginName; - - /** The type of purge job being executed */ - private PURGE_JOB_TYPE purgeType; - - /** Last time job has printed a timed out message */ - private long lastTimeOutMessage = 0; - - /** - * Creates a new Purge job for the specified plugin. - * - * @param pluginName - * The plugin to be purged - * @param purgeType - * The type of purge to be executed - */ - public PurgeJob(String pluginName, PURGE_JOB_TYPE purgeType) { - // Give the thread a name - this.setName("Purge-" + pluginName.toUpperCase() + "-Thread"); - this.pluginName = pluginName; - this.purgeType = purgeType; - } - - public void run() { - - // Flag used to track if this job has failed - boolean failed = false; - startTime = System.currentTimeMillis(); - PurgeLogger.logInfo("Purging expired data...", pluginName); - PluginDao dao = null; - - try { - dao = PluginFactory.getInstance().getPluginDao(pluginName); - if (dao.getDaoClass() != null) { - dao.purgeExpiredData(); - PurgeLogger.logInfo("Data successfully Purged!", pluginName); - } else { - Method m = dao.getClass().getMethod("purgeExpiredData", - new Class[] {}); - if (m != null) { - if (m.getDeclaringClass().equals(PluginDao.class)) { - PurgeLogger - .logWarn( - "Unable to purge data. This plugin does not specify a record class and does not implement a custom purger.", - pluginName); - } else { - if (this.purgeType.equals(PURGE_JOB_TYPE.PURGE_EXPIRED)) { - dao.purgeExpiredData(); - } else { - dao.purgeAllData(); - } - PurgeLogger.logInfo("Data successfully Purged!", - pluginName); - } - } - } - } catch (Exception e) { - failed = true; - // keep getting next exceptions with sql exceptions to ensure - // we can see the underlying error - PurgeLogger - .logError("Error purging expired data!\n", pluginName, e); - Throwable t = e.getCause(); - while (t != null) { - if (t instanceof SQLException) { - SQLException se = ((SQLException) t).getNextException(); - PurgeLogger.logError("Next exception:", pluginName, se); - } - t = t.getCause(); - } - } finally { - ClusterTask purgeLock = PurgeManager.getInstance().getPurgeLock(); - try { - /* - * Update the status accordingly if the purge failed or - * succeeded - */ - PurgeDao purgeDao = new PurgeDao(); - PurgeJobStatus status = purgeDao - .getJobForPlugin(this.pluginName); - if (status == null) { - PurgeLogger.logError( - "Purge job completed but no status object found!", - this.pluginName); - } else { - if (failed) { - status.incrementFailedCount(); - if (status.getFailedCount() >= PurgeManager - .getInstance().getFatalFailureCount()) { - PurgeLogger - .logFatal( - "Purger for this plugin has reached or exceeded consecutive failure limit of " - + PurgeManager - .getInstance() - .getFatalFailureCount() - + ". Data will no longer being purged for this plugin.", - pluginName); - } else { - PurgeLogger.logError("Purge job has failed " - + status.getFailedCount() - + " consecutive times.", this.pluginName); - // Reset the start time so we can try again as soon - // as possible - status.setStartTime(new Date(0)); - } - } else { - status.setFailedCount(0); - } - - /* - * This purger thread has exceeded the time out duration but - * finally finished. Output a message and update the status - */ - int deadPurgeJobAge = PurgeManager.getInstance() - .getDeadPurgeJobAge(); - Calendar purgeTimeOutLimit = Calendar.getInstance(); - purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT")); - purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge); - if (startTime < purgeTimeOutLimit.getTimeInMillis()) { - PurgeLogger - .logInfo( - "Purge job has recovered from timed out state!!", - pluginName); - } - status.setRunning(false); - purgeDao.update(status); - /* - * Log execution times - */ - long executionTime = getAge(); - long execTimeInMinutes = executionTime / 60000; - if (execTimeInMinutes > 0) { - PurgeLogger.logInfo("Purge run time: " + executionTime - + " ms (" + execTimeInMinutes + " minutes)", - this.pluginName); - } else { - PurgeLogger.logInfo("Purge run time: " + executionTime - + " ms", this.pluginName); - } - } - } catch (Throwable e) { - PurgeLogger - .logError( - "An unexpected error occurred upon completion of the purge job", - this.pluginName, e); - } finally { - ClusterLockUtils.unlock(purgeLock, false); - } - } - } - - public void printTimedOutMessage(int deadPurgeJobAge) { - // only print message every 5 minutes - if (System.currentTimeMillis() - lastTimeOutMessage > 300000) { - PurgeLogger.logFatal( - "Purger running time has exceeded timeout duration of " - + deadPurgeJobAge - + " minutes. Current running time: " - + (getAge() / 60000) + " minutes", pluginName); - printStackTrace(); - } - } - - /** - * Prints the stack trace for this job thread. - */ - public void printStackTrace() { - StringBuffer buffer = new StringBuffer(); - buffer.append("Stack trace for Purge Job Thread:\n"); - buffer.append(getStackTrace(this)); - // If this thread is blocked, output the stack traces for the other - // blocked threads to assist in determining the source of the - // deadlocked - // threads - if (this.getState().equals(State.BLOCKED)) { - buffer.append("\tDUMPING OTHER BLOCKED THREADS\n"); - buffer.append(getBlockedStackTraces()); - - } - PurgeLogger.logError(buffer.toString(), this.pluginName); - - } - - /** - * Gets the stack traces for all other threads in the BLOCKED state in the - * JVM - * - * @return The stack traces for all other threads in the BLOCKED state in - * the JVM - */ - private String getBlockedStackTraces() { - StringBuffer buffer = new StringBuffer(); - Map threads = Thread.getAllStackTraces(); - for (Thread t : threads.keySet()) { - if (t.getState().equals(State.BLOCKED)) { - if (t.getId() != this.getId()) { - buffer.append(getStackTrace(t)); - } - } - } - - return buffer.toString(); - } - - /** - * Gets the stack trace for the given thread - * - * @param thread - * The thread to get the stack trace for - * @return The stack trace as a String - */ - private String getStackTrace(Thread thread) { - StringBuffer buffer = new StringBuffer(); - StackTraceElement[] stack = Thread.getAllStackTraces().get(thread); - buffer.append("\tThread ID: ").append(thread.getId()) - .append(" Thread state: ").append(this.getState()) - .append("\n"); - if (stack == null) { - buffer.append("No stack trace could be retrieved for this thread"); - } else { - for (int i = 0; i < stack.length; i++) { - buffer.append("\t\t").append(stack[i]).append("\n"); - } - } - return buffer.toString(); - } - - public long getStartTime() { - return startTime; - } - - public long getAge() { - return System.currentTimeMillis() - startTime; - } -} +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.purgesrv; + +import java.lang.reflect.Method; +import java.sql.SQLException; +import java.util.Calendar; +import java.util.Date; +import java.util.Map; +import java.util.TimeZone; + +import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; +import com.raytheon.uf.edex.database.cluster.ClusterTask; +import com.raytheon.uf.edex.database.plugin.PluginDao; +import com.raytheon.uf.edex.database.plugin.PluginFactory; +import com.raytheon.uf.edex.database.purge.PurgeLogger; + +/** + * + * This class encapsulates the purge activity for a plugin into a cluster task. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Apr 19, 2012 #470       bphillip     Initial creation
+ * 
+ * 
+ * + * @author bphillip + * @version 1.0 + */ +public class PurgeJob extends Thread { + + /** The type of purge */ + public enum PURGE_JOB_TYPE { + PURGE_ALL, PURGE_EXPIRED + } + + private long startTime; + + /** The cluster task name to use for purge jobs */ + public static final String TASK_NAME = "Purge Plugin Data"; + + /** The plugin associated with this purge job */ + private String pluginName; + + /** The type of purge job being executed */ + private PURGE_JOB_TYPE purgeType; + + /** Last time job has printed a timed out message */ + private long lastTimeOutMessage = 0; + + /** + * Creates a new Purge job for the specified plugin. + * + * @param pluginName + * The plugin to be purged + * @param purgeType + * The type of purge to be executed + */ + public PurgeJob(String pluginName, PURGE_JOB_TYPE purgeType) { + // Give the thread a name + this.setName("Purge-" + pluginName.toUpperCase() + "-Thread"); + this.pluginName = pluginName; + this.purgeType = purgeType; + } + + public void run() { + + // Flag used to track if this job has failed + boolean failed = false; + startTime = System.currentTimeMillis(); + PurgeLogger.logInfo("Purging expired data...", pluginName); + PluginDao dao = null; + + try { + dao = PluginFactory.getInstance().getPluginDao(pluginName); + if (dao.getDaoClass() != null) { + dao.purgeExpiredData(); + PurgeLogger.logInfo("Data successfully Purged!", pluginName); + } else { + Method m = dao.getClass().getMethod("purgeExpiredData", + new Class[] {}); + if (m != null) { + if (m.getDeclaringClass().equals(PluginDao.class)) { + PurgeLogger + .logWarn( + "Unable to purge data. This plugin does not specify a record class and does not implement a custom purger.", + pluginName); + } else { + if (this.purgeType.equals(PURGE_JOB_TYPE.PURGE_EXPIRED)) { + dao.purgeExpiredData(); + } else { + dao.purgeAllData(); + } + PurgeLogger.logInfo("Data successfully Purged!", + pluginName); + } + } + } + } catch (Exception e) { + failed = true; + // keep getting next exceptions with sql exceptions to ensure + // we can see the underlying error + PurgeLogger + .logError("Error purging expired data!\n", pluginName, e); + Throwable t = e.getCause(); + while (t != null) { + if (t instanceof SQLException) { + SQLException se = ((SQLException) t).getNextException(); + PurgeLogger.logError("Next exception:", pluginName, se); + } + t = t.getCause(); + } + } finally { + ClusterTask purgeLock = PurgeManager.getInstance().getPurgeLock(); + try { + /* + * Update the status accordingly if the purge failed or + * succeeded + */ + PurgeDao purgeDao = new PurgeDao(); + PurgeJobStatus status = purgeDao + .getJobForPlugin(this.pluginName); + if (status == null) { + PurgeLogger.logError( + "Purge job completed but no status object found!", + this.pluginName); + } else { + if (failed) { + status.incrementFailedCount(); + if (status.getFailedCount() >= PurgeManager + .getInstance().getFatalFailureCount()) { + PurgeLogger + .logFatal( + "Purger for this plugin has reached or exceeded consecutive failure limit of " + + PurgeManager + .getInstance() + .getFatalFailureCount() + + ". Data will no longer being purged for this plugin.", + pluginName); + } else { + PurgeLogger.logError("Purge job has failed " + + status.getFailedCount() + + " consecutive times.", this.pluginName); + // Back the start time off by half an hour to try to + // purgin soon, don't want to start immediately so + // it doesn't ping pong between servers in a time + // out scenario + Date startTime = status.getStartTime(); + startTime.setTime(startTime.getTime() - (1800000)); + } + } else { + status.setFailedCount(0); + } + + /* + * This purger thread has exceeded the time out duration but + * finally finished. Output a message and update the status + */ + int deadPurgeJobAge = PurgeManager.getInstance() + .getDeadPurgeJobAge(); + Calendar purgeTimeOutLimit = Calendar.getInstance(); + purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT")); + purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge); + if (startTime < purgeTimeOutLimit.getTimeInMillis()) { + PurgeLogger + .logInfo( + "Purge job has recovered from timed out state!!", + pluginName); + } + status.setRunning(false); + purgeDao.update(status); + /* + * Log execution times + */ + long executionTime = getAge(); + long execTimeInMinutes = executionTime / 60000; + if (execTimeInMinutes > 0) { + PurgeLogger.logInfo("Purge run time: " + executionTime + + " ms (" + execTimeInMinutes + " minutes)", + this.pluginName); + } else { + PurgeLogger.logInfo("Purge run time: " + executionTime + + " ms", this.pluginName); + } + } + } catch (Throwable e) { + PurgeLogger + .logError( + "An unexpected error occurred upon completion of the purge job", + this.pluginName, e); + } finally { + ClusterLockUtils.unlock(purgeLock, false); + } + } + } + + public void printTimedOutMessage(int deadPurgeJobAge) { + // only print message every 5 minutes + if (System.currentTimeMillis() - lastTimeOutMessage > 300000) { + PurgeLogger.logFatal( + "Purger running time has exceeded timeout duration of " + + deadPurgeJobAge + + " minutes. Current running time: " + + (getAge() / 60000) + " minutes", pluginName); + printStackTrace(); + } + } + + /** + * Prints the stack trace for this job thread. + */ + public void printStackTrace() { + StringBuffer buffer = new StringBuffer(); + buffer.append("Stack trace for Purge Job Thread:\n"); + buffer.append(getStackTrace(this)); + // If this thread is blocked, output the stack traces for the other + // blocked threads to assist in determining the source of the + // deadlocked + // threads + if (this.getState().equals(State.BLOCKED)) { + buffer.append("\tDUMPING OTHER BLOCKED THREADS\n"); + buffer.append(getBlockedStackTraces()); + + } + PurgeLogger.logError(buffer.toString(), this.pluginName); + + } + + /** + * Gets the stack traces for all other threads in the BLOCKED state in the + * JVM + * + * @return The stack traces for all other threads in the BLOCKED state in + * the JVM + */ + private String getBlockedStackTraces() { + StringBuffer buffer = new StringBuffer(); + Map threads = Thread.getAllStackTraces(); + for (Thread t : threads.keySet()) { + if (t.getState().equals(State.BLOCKED)) { + if (t.getId() != this.getId()) { + buffer.append(getStackTrace(t)); + } + } + } + + return buffer.toString(); + } + + /** + * Gets the stack trace for the given thread + * + * @param thread + * The thread to get the stack trace for + * @return The stack trace as a String + */ + private String getStackTrace(Thread thread) { + StringBuffer buffer = new StringBuffer(); + StackTraceElement[] stack = Thread.getAllStackTraces().get(thread); + buffer.append("\tThread ID: ").append(thread.getId()) + .append(" Thread state: ").append(this.getState()) + .append("\n"); + if (stack == null) { + buffer.append("No stack trace could be retrieved for this thread"); + } else { + for (int i = 0; i < stack.length; i++) { + buffer.append("\t\t").append(stack[i]).append("\n"); + } + } + return buffer.toString(); + } + + public long getStartTime() { + return startTime; + } + + public long getAge() { + return System.currentTimeMillis() - startTime; + } +} diff --git a/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeManager.java b/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeManager.java index 3af330457e..a21bb7fc7e 100644 --- a/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeManager.java +++ b/edexOsgi/com.raytheon.uf.edex.purgesrv/src/com/raytheon/uf/edex/purgesrv/PurgeManager.java @@ -1,480 +1,488 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ -package com.raytheon.uf.edex.purgesrv; - -import java.lang.Thread.State; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.concurrent.ConcurrentHashMap; - -import com.raytheon.uf.edex.core.dataplugin.PluginRegistry; -import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; -import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState; -import com.raytheon.uf.edex.database.cluster.ClusterTask; -import com.raytheon.uf.edex.database.purge.PurgeLogger; -import com.raytheon.uf.edex.database.status.StatusConstants; -import com.raytheon.uf.edex.purgesrv.PurgeJob.PURGE_JOB_TYPE; - -/** - * - * Object for managing purge jobs. The purge manager relies on the purgejobs - * table to coordinate information. The executePurge() method on this class is - * executed every minute via a quartz timer defined in the purge-spring.xml - * Spring configuration file. - *

- * The purge manager is designed to adhere to the following rules: - *

- * · The cluster may have no more than 6 purge jobs running simultaneously by - * default. This property is configurable in the project.properties file
- * · Any given server may have no more than 2 purge jobs running simultaneously - * by default. This property is configurable in the project.properties file
- * · A purge job for a plugin is considered 'hung' if it has been running for - * more than 20 minutes by default. This property is configurable in the - * project.properties file
- * · If a purge job that was previously determined to be hung actually finishes - * it's execution, the cluster lock is updated appropriately and the purge job - * is able to resume normal operation. This is in place so if a hung purge - * process goes unnoticed for a period of time, the server will still try to - * recover autonomously if it can.
- * · If a purge job is determined to be hung, the stack trace for the thread - * executing the job is output to the log. Furthermore, if the job is in the - * BLOCKED state, the stack traces for all other BLOCKED threads is output to - * the purge log as part of a rudimentary deadlock detection strategy to be used - * by personnel attempting to remedy the situation.
- * · By default, a fatal condition occurs if a given plugin's purge job fails 3 - * consecutive times.
- * · If a purge job hangs on one server in the cluster, it will try and run on - * another cluster member at the next purge interval.
- * · If the purge manager attempts to purge a plugin that has been running for - * longer than the 20 minute threshold, it is considered a failure, and the - * failure count is updated. - *

- * - * - *

- * 
- * SOFTWARE HISTORY
- * 
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Apr 18, 2012 #470       bphillip    Initial creation
- * 
- * 
- * - * @author bphillip - * @version 1.0 - */ -public class PurgeManager { - - /** Purge Manager task name */ - private static final String PURGE_TASK_NAME = "Purge Manager"; - - /** Purge Manager task details */ - private static final String PURGE_TASK_DETAILS = "Purge Manager Job"; - - /** Purge Manager task override timeout. Currently 2 minutes */ - private static final long PURGE_MANAGER_TIMEOUT = 120000; - - /** - * The cluster limit property to be set via Spring with the value defined in - * project.properties - */ - private int clusterLimit = 6; - - /** - * The server limit property to be set via Spring with the value defined in - * project.properties - */ - private int serverLimit = 2; - - /** - * The time in minutes at which a purge job is considered 'dead' or 'hung' - * set via Spring with the value defined in project.properties - */ - private int deadPurgeJobAge = 20; - - /** - * The frequency, in minutes, that a plugin may be purged set via Spring - * with the value defined in project.properties - */ - private int purgeFrequency = 60; - - /** - * How many times a purger is allowed to fail before it is considered fatal. - * Set via Spring with the value defined in project.properties - */ - private int fatalFailureCount = 3; - - /** - * The master switch defined in project.properties that enables and disables - * data purging - */ - private boolean purgeEnabled = true; - - /** Map of purge jobs */ - private Map purgeJobs = new ConcurrentHashMap(); - - private PurgeDao dao = new PurgeDao(); - - private static PurgeManager instance = new PurgeManager(); - - public static PurgeManager getInstance() { - return instance; - } - - /** - * Creates a new PurgeManager - */ - private PurgeManager() { - } - - /** - * Executes the purge routine - */ - public void executePurge() { - if (!purgeEnabled) { - PurgeLogger.logWarn( - "Data purging has been disabled. No data will be purged.", - null); - return; - } - - ClusterTask purgeMgrTask = getPurgeLock(); - - try { - // Prune the job map - Iterator iter = purgeJobs.values().iterator(); - while (iter.hasNext()) { - if (!iter.next().isAlive()) { - iter.remove(); - } - } - Calendar purgeTimeOutLimit = Calendar.getInstance(); - purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT")); - purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge); - Calendar purgeFrequencyLimit = Calendar.getInstance(); - purgeFrequencyLimit.setTimeZone(TimeZone.getTimeZone("GMT")); - purgeFrequencyLimit.add(Calendar.MINUTE, -purgeFrequency); - - // Gets the list of plugins in ascending order by the last time they - // were purged - List pluginList = dao.getPluginsByPurgeTime(); - - // check for any new plugins or database being purged and needing - // entries recreated - Set availablePlugins = new HashSet(PluginRegistry - .getInstance().getRegisteredObjects()); - - // Merge the lists - availablePlugins.removeAll(pluginList); - - if (availablePlugins.size() > 0) { - // generate new list with them at the beginning - List newSortedPlugins = new ArrayList( - availablePlugins); - Collections.sort(newSortedPlugins); - newSortedPlugins.addAll(pluginList); - pluginList = newSortedPlugins; - } - - boolean canPurge = true; - int jobsStarted = 0; - int maxNumberOfJobsToStart = Math.min( - clusterLimit - - dao.getRunningClusterJobs( - purgeTimeOutLimit.getTime(), - fatalFailureCount), serverLimit - - getNumberRunningJobsOnServer(purgeTimeOutLimit)); - for (String plugin : pluginList) { - try { - // initialize canPurge based on number of jobs started - canPurge = jobsStarted < maxNumberOfJobsToStart; - PurgeJob jobThread = purgeJobs.get(plugin); - PurgeJobStatus job = dao.getJobForPlugin(plugin); - - if (job == null) { - // no job on server, generate empty job - - try { - job = new PurgeJobStatus(); - job.setPlugin(plugin); - job.setFailedCount(0); - job.setRunning(false); - job.setStartTime(new Date(0)); - dao.create(job); - } catch (Throwable e) { - PurgeLogger.logError( - "Failed to create new purge job entry", - plugin, e); - } - } - - // Check to see if this job has met the fatal failure count - if (job.getFailedCount() >= fatalFailureCount) { - canPurge = false; - PurgeLogger - .logFatal( - "Purger for this plugin has reached or exceeded consecutive failure limit of " - + fatalFailureCount - + ". Data will no longer being purged for this plugin.", - plugin); - } - - if (job.isRunning()) { - // check if job has timed out - if (purgeTimeOutLimit.getTimeInMillis() > job - .getStartTime().getTime()) { - // if no one else sets canPurge = false will start - // purging on this server - if (jobThread != null) { - // job currently running on our server, don't - // start another - canPurge = false; - jobThread.printTimedOutMessage(deadPurgeJobAge); - } - } - } else { - // not currently running, check if need to be purged - Date startTime = job.getStartTime(); - if (startTime != null - && startTime.getTime() >= purgeFrequencyLimit - .getTimeInMillis()) { - canPurge = false; - } - } - - if (canPurge) { - purgeJobs.put(plugin, purgeExpiredData(plugin)); - jobsStarted++; - } - } catch (Throwable e) { - PurgeLogger - .logError( - "An unexpected error occured during the purge job check for plugin", - plugin, e); - } - } - } catch (Throwable e) { - PurgeLogger - .logError( - "An unexpected error occured during the data purge process", - StatusConstants.CATEGORY_PURGE, e); - } finally { - // Unlock the purge task to allow other servers to run. - ClusterLockUtils.unlock(purgeMgrTask, false); - // PurgeLogger.logInfo(getPurgeStatus(true), null); - } - } - - @SuppressWarnings("unused") - private String getPurgeStatus(boolean verbose) { - Calendar purgeTimeOutLimit = Calendar.getInstance(); - purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT")); - purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge); - - StringBuilder builder = new StringBuilder(); - List failedJobs = dao.getFailedJobs(fatalFailureCount); - - List timedOutJobs = dao - .getTimedOutJobs(purgeTimeOutLimit.getTime()); - int clusterJobs = dao.getRunningClusterJobs( - purgeTimeOutLimit.getTime(), fatalFailureCount); - Map> serverMap = dao - .getRunningServerJobs(); - builder.append("\nPURGE JOB STATUS:"); - builder.append("\n\tTotal Jobs Running On Cluster: ").append( - clusterJobs); - List jobs = null; - for (String server : serverMap.keySet()) { - jobs = serverMap.get(server); - builder.append("\n\tJobs Running On ").append(server).append(": ") - .append(jobs.size()); - if (verbose && !jobs.isEmpty()) { - builder.append(" Plugins: "); - for (int i = 0; i < jobs.size(); i++) { - builder.append(jobs.get(i).getPlugin()); - if (i != jobs.size() - 1) { - builder.append(","); - } - } - } - } - if (verbose) { - builder.append("\n\tFailed Jobs: "); - if (failedJobs.isEmpty()) { - builder.append("0"); - } else { - PurgeJobStatus currentJob = null; - for (int i = 0; i < failedJobs.size(); i++) { - currentJob = failedJobs.get(i); - builder.append(currentJob.getPlugin()); - if (i != failedJobs.size() - 1) { - builder.append(","); - } - } - } - - builder.append("\n\tTimed Out Jobs: "); - if (timedOutJobs.isEmpty()) { - builder.append("0"); - } else { - PurgeJobStatus currentJob = null; - for (int i = 0; i < timedOutJobs.size(); i++) { - currentJob = timedOutJobs.get(i); - builder.append(currentJob.getPlugin()); - if (i != timedOutJobs.size() - 1) { - builder.append(","); - } - } - } - } - return builder.toString(); - } - - public ClusterTask getPurgeLock() { - // Lock so only one cluster member may start purge processes - ClusterTask purgeMgrTask = ClusterLockUtils.lock(PURGE_TASK_NAME, - PURGE_TASK_DETAILS, PURGE_MANAGER_TIMEOUT, true); - - LockState purgeMgrLockState = purgeMgrTask.getLockState(); - switch (purgeMgrLockState) { - case FAILED: - PurgeLogger.logError( - "Purge Manager failed to acquire cluster task lock", - StatusConstants.CATEGORY_PURGE); - return null; - case OLD: - PurgeLogger.logWarn("Purge Manager acquired old cluster task lock", - StatusConstants.CATEGORY_PURGE); - break; - case ALREADY_RUNNING: - PurgeLogger - .logWarn( - "Purge Manager acquired currently running cluster task lock", - StatusConstants.CATEGORY_PURGE); - return null; - case SUCCESSFUL: - break; - } - return purgeMgrTask; - } - - private int getNumberRunningJobsOnServer(Calendar timeOutTime) { - int rval = 0; - for (PurgeJob job : purgeJobs.values()) { - // if job has not timed out or if the job is not blocked consider it - // running on this server - if (timeOutTime.getTimeInMillis() < job.getStartTime() - || !job.getState().equals(State.BLOCKED)) { - rval++; - } - - } - return rval; - } - - /** - * Starts a purge expired data job for the specified plugin. Using this - * method allows for exceeding failure count via a manual purge as well as - * kicking off a second purge for one already running on a server. - * - * @param plugin - * The plugin to purge the expired data for - * @return The PurgeJob that was started - */ - public PurgeJob purgeExpiredData(String plugin) { - dao.startJob(plugin); - PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_EXPIRED); - job.start(); - return job; - } - - /** - * Starts a purge all data job for the specified plugin. Using this method - * allows for exceeding failure count via a manual purge as well as kicking - * off a second purge for one already running on a server. - * - * @param plugin - * The plugin to purge all data for - * @return The PurgeJob that was started - */ - public PurgeJob purgeAllData(String plugin) { - dao.startJob(plugin); - PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_ALL); - job.start(); - return job; - } - - public int getClusterLimit() { - return clusterLimit; - } - - public void setClusterLimit(int clusterLimit) { - this.clusterLimit = clusterLimit; - } - - public int getServerLimit() { - return serverLimit; - } - - public void setServerLimit(int serverLimit) { - this.serverLimit = serverLimit; - } - - public int getDeadPurgeJobAge() { - return deadPurgeJobAge; - } - - public void setDeadPurgeJobAge(int deadPurgeJobAge) { - this.deadPurgeJobAge = deadPurgeJobAge; - } - - public int getPurgeFrequency() { - return purgeFrequency; - } - - public void setPurgeFrequency(int purgeFrequency) { - this.purgeFrequency = purgeFrequency; - } - - public int getFatalFailureCount() { - return this.fatalFailureCount; - } - - public void setFatalFailureCount(int fatalFailureCount) { - this.fatalFailureCount = fatalFailureCount; - } - - public void setPurgeEnabled(boolean purgeEnabled) { - this.purgeEnabled = purgeEnabled; - } - - public boolean getPurgeEnabled() { - return purgeEnabled; - } -} +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.purgesrv; + +import java.lang.Thread.State; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; + +import com.raytheon.uf.edex.core.dataplugin.PluginRegistry; +import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; +import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState; +import com.raytheon.uf.edex.database.cluster.ClusterTask; +import com.raytheon.uf.edex.database.purge.PurgeLogger; +import com.raytheon.uf.edex.database.status.StatusConstants; +import com.raytheon.uf.edex.purgesrv.PurgeJob.PURGE_JOB_TYPE; + +/** + * + * Object for managing purge jobs. The purge manager relies on the purgejobs + * table to coordinate information. The executePurge() method on this class is + * executed every minute via a quartz timer defined in the purge-spring.xml + * Spring configuration file. + *

+ * The purge manager is designed to adhere to the following rules: + *

+ * · The cluster may have no more than 6 purge jobs running simultaneously by + * default. This property is configurable in the project.properties file
+ * · Any given server may have no more than 2 purge jobs running simultaneously + * by default. This property is configurable in the project.properties file
+ * · A purge job for a plugin is considered 'hung' if it has been running for + * more than 20 minutes by default. This property is configurable in the + * project.properties file
+ * · If a purge job that was previously determined to be hung actually finishes + * it's execution, the cluster lock is updated appropriately and the purge job + * is able to resume normal operation. This is in place so if a hung purge + * process goes unnoticed for a period of time, the server will still try to + * recover autonomously if it can.
+ * · If a purge job is determined to be hung, the stack trace for the thread + * executing the job is output to the log. Furthermore, if the job is in the + * BLOCKED state, the stack traces for all other BLOCKED threads is output to + * the purge log as part of a rudimentary deadlock detection strategy to be used + * by personnel attempting to remedy the situation.
+ * · By default, a fatal condition occurs if a given plugin's purge job fails 3 + * consecutive times.
+ * · If a purge job hangs on one server in the cluster, it will try and run on + * another cluster member at the next purge interval.
+ * · If the purge manager attempts to purge a plugin that has been running for + * longer than the 20 minute threshold, it is considered a failure, and the + * failure count is updated. + *

+ * + * + *

+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Apr 18, 2012 #470       bphillip    Initial creation
+ * 
+ * 
+ * + * @author bphillip + * @version 1.0 + */ +public class PurgeManager { + + /** Purge Manager task name */ + private static final String PURGE_TASK_NAME = "Purge Manager"; + + /** Purge Manager task details */ + private static final String PURGE_TASK_DETAILS = "Purge Manager Job"; + + /** Purge Manager task override timeout. Currently 2 minutes */ + private static final long PURGE_MANAGER_TIMEOUT = 120000; + + /** + * The cluster limit property to be set via Spring with the value defined in + * project.properties + */ + private int clusterLimit = 6; + + /** + * The server limit property to be set via Spring with the value defined in + * project.properties + */ + private int serverLimit = 2; + + /** + * The time in minutes at which a purge job is considered 'dead' or 'hung' + * set via Spring with the value defined in project.properties + */ + private int deadPurgeJobAge = 20; + + /** + * The frequency, in minutes, that a plugin may be purged set via Spring + * with the value defined in project.properties + */ + private int purgeFrequency = 60; + + /** + * How many times a purger is allowed to fail before it is considered fatal. + * Set via Spring with the value defined in project.properties + */ + private int fatalFailureCount = 3; + + /** + * The master switch defined in project.properties that enables and disables + * data purging + */ + private boolean purgeEnabled = true; + + /** Map of purge jobs */ + private Map purgeJobs = new ConcurrentHashMap(); + + private PurgeDao dao = new PurgeDao(); + + private static PurgeManager instance = new PurgeManager(); + + public static PurgeManager getInstance() { + return instance; + } + + /** + * Creates a new PurgeManager + */ + private PurgeManager() { + } + + /** + * Executes the purge routine + */ + public void executePurge() { + if (!purgeEnabled) { + PurgeLogger.logWarn( + "Data purging has been disabled. No data will be purged.", + null); + return; + } + + ClusterTask purgeMgrTask = getPurgeLock(); + + try { + // Prune the job map + Iterator iter = purgeJobs.values().iterator(); + while (iter.hasNext()) { + if (!iter.next().isAlive()) { + iter.remove(); + } + } + + Calendar purgeTimeOutLimit = Calendar.getInstance(); + purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT")); + purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge); + Calendar purgeFrequencyLimit = Calendar.getInstance(); + purgeFrequencyLimit.setTimeZone(TimeZone.getTimeZone("GMT")); + purgeFrequencyLimit.add(Calendar.MINUTE, -purgeFrequency); + + // Gets the list of plugins in ascending order by the last time they + // were purged + List pluginList = dao.getPluginsByPurgeTime(); + + // check for any new plugins or database being purged and needing + // entries recreated + Set availablePlugins = new HashSet(PluginRegistry + .getInstance().getRegisteredObjects()); + + // Merge the lists + availablePlugins.removeAll(pluginList); + + if (availablePlugins.size() > 0) { + // generate new list with them at the beginning + List newSortedPlugins = new ArrayList( + availablePlugins); + Collections.sort(newSortedPlugins); + newSortedPlugins.addAll(pluginList); + pluginList = newSortedPlugins; + } + + boolean canPurge = true; + int jobsStarted = 0; + int maxNumberOfJobsToStart = Math.min( + clusterLimit + - dao.getRunningClusterJobs( + purgeTimeOutLimit.getTime(), + fatalFailureCount), serverLimit + - getNumberRunningJobsOnServer(purgeTimeOutLimit)); + for (String plugin : pluginList) { + try { + // initialize canPurge based on number of jobs started + canPurge = jobsStarted < maxNumberOfJobsToStart; + PurgeJob jobThread = purgeJobs.get(plugin); + PurgeJobStatus job = dao.getJobForPlugin(plugin); + + if (job == null) { + // no job in database, generate empty job + + try { + job = new PurgeJobStatus(); + job.setPlugin(plugin); + job.setFailedCount(0); + job.setRunning(false); + job.setStartTime(new Date(0)); + dao.create(job); + } catch (Throwable e) { + PurgeLogger.logError( + "Failed to create new purge job entry", + plugin, e); + } + } + + // Check to see if this job has met the fatal failure count + if (job.getFailedCount() >= fatalFailureCount) { + canPurge = false; + PurgeLogger + .logFatal( + "Purger for this plugin has reached or exceeded consecutive failure limit of " + + fatalFailureCount + + ". Data will no longer being purged for this plugin.", + plugin); + } + + // is purge job currently running on this server + if (jobThread != null) { + // job currently running on our server, don't start + // another + canPurge = false; + + if (purgeTimeOutLimit.getTimeInMillis() > jobThread + .getStartTime()) { + jobThread.printTimedOutMessage(deadPurgeJobAge); + } + } else { + if (job.isRunning()) { + // check if job has timed out + if (purgeTimeOutLimit.getTime().before( + job.getStartTime())) { + canPurge = false; + } + // else if no one else sets canPurge = false will + // start purging on this server + } else { + // not currently running, check if need to be purged + Date startTime = job.getStartTime(); + if (startTime != null + && startTime.after(purgeFrequencyLimit + .getTime())) { + canPurge = false; + } + } + } + + if (canPurge) { + purgeJobs.put(plugin, purgeExpiredData(plugin)); + jobsStarted++; + } + } catch (Throwable e) { + PurgeLogger + .logError( + "An unexpected error occured during the purge job check for plugin", + plugin, e); + } + } + } catch (Throwable e) { + PurgeLogger + .logError( + "An unexpected error occured during the data purge process", + StatusConstants.CATEGORY_PURGE, e); + } finally { + // Unlock the purge task to allow other servers to run. + ClusterLockUtils.unlock(purgeMgrTask, false); + // PurgeLogger.logInfo(getPurgeStatus(true), null); + } + } + + @SuppressWarnings("unused") + private String getPurgeStatus(boolean verbose) { + Calendar purgeTimeOutLimit = Calendar.getInstance(); + purgeTimeOutLimit.setTimeZone(TimeZone.getTimeZone("GMT")); + purgeTimeOutLimit.add(Calendar.MINUTE, -deadPurgeJobAge); + + StringBuilder builder = new StringBuilder(); + List failedJobs = dao.getFailedJobs(fatalFailureCount); + + List timedOutJobs = dao + .getTimedOutJobs(purgeTimeOutLimit.getTime()); + int clusterJobs = dao.getRunningClusterJobs( + purgeTimeOutLimit.getTime(), fatalFailureCount); + Map> serverMap = dao + .getRunningServerJobs(); + builder.append("\nPURGE JOB STATUS:"); + builder.append("\n\tTotal Jobs Running On Cluster: ").append( + clusterJobs); + List jobs = null; + for (String server : serverMap.keySet()) { + jobs = serverMap.get(server); + builder.append("\n\tJobs Running On ").append(server).append(": ") + .append(jobs.size()); + if (verbose && !jobs.isEmpty()) { + builder.append(" Plugins: "); + for (int i = 0; i < jobs.size(); i++) { + builder.append(jobs.get(i).getPlugin()); + if (i != jobs.size() - 1) { + builder.append(","); + } + } + } + } + if (verbose) { + builder.append("\n\tFailed Jobs: "); + if (failedJobs.isEmpty()) { + builder.append("0"); + } else { + PurgeJobStatus currentJob = null; + for (int i = 0; i < failedJobs.size(); i++) { + currentJob = failedJobs.get(i); + builder.append(currentJob.getPlugin()); + if (i != failedJobs.size() - 1) { + builder.append(","); + } + } + } + + builder.append("\n\tTimed Out Jobs: "); + if (timedOutJobs.isEmpty()) { + builder.append("0"); + } else { + PurgeJobStatus currentJob = null; + for (int i = 0; i < timedOutJobs.size(); i++) { + currentJob = timedOutJobs.get(i); + builder.append(currentJob.getPlugin()); + if (i != timedOutJobs.size() - 1) { + builder.append(","); + } + } + } + } + return builder.toString(); + } + + public ClusterTask getPurgeLock() { + // Lock so only one cluster member may start purge processes + ClusterTask purgeMgrTask = ClusterLockUtils.lock(PURGE_TASK_NAME, + PURGE_TASK_DETAILS, PURGE_MANAGER_TIMEOUT, true); + + LockState purgeMgrLockState = purgeMgrTask.getLockState(); + switch (purgeMgrLockState) { + case FAILED: + PurgeLogger.logError( + "Purge Manager failed to acquire cluster task lock", + StatusConstants.CATEGORY_PURGE); + return null; + case OLD: + PurgeLogger.logWarn("Purge Manager acquired old cluster task lock", + StatusConstants.CATEGORY_PURGE); + break; + case ALREADY_RUNNING: + PurgeLogger + .logWarn( + "Purge Manager acquired currently running cluster task lock", + StatusConstants.CATEGORY_PURGE); + return null; + case SUCCESSFUL: + break; + } + return purgeMgrTask; + } + + private int getNumberRunningJobsOnServer(Calendar timeOutTime) { + int rval = 0; + for (PurgeJob job : purgeJobs.values()) { + // if job has not timed out or if the job is not blocked consider it + // running on this server + if (timeOutTime.getTimeInMillis() < job.getStartTime() + || !job.getState().equals(State.BLOCKED)) { + rval++; + } + + } + return rval; + } + + /** + * Starts a purge expired data job for the specified plugin. Using this + * method allows for exceeding failure count via a manual purge as well as + * kicking off a second purge for one already running on a server. + * + * @param plugin + * The plugin to purge the expired data for + * @return The PurgeJob that was started + */ + public PurgeJob purgeExpiredData(String plugin) { + dao.startJob(plugin); + PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_EXPIRED); + job.start(); + return job; + } + + /** + * Starts a purge all data job for the specified plugin. Using this method + * allows for exceeding failure count via a manual purge as well as kicking + * off a second purge for one already running on a server. + * + * @param plugin + * The plugin to purge all data for + * @return The PurgeJob that was started + */ + public PurgeJob purgeAllData(String plugin) { + dao.startJob(plugin); + PurgeJob job = new PurgeJob(plugin, PURGE_JOB_TYPE.PURGE_ALL); + job.start(); + return job; + } + + public int getClusterLimit() { + return clusterLimit; + } + + public void setClusterLimit(int clusterLimit) { + this.clusterLimit = clusterLimit; + } + + public int getServerLimit() { + return serverLimit; + } + + public void setServerLimit(int serverLimit) { + this.serverLimit = serverLimit; + } + + public int getDeadPurgeJobAge() { + return deadPurgeJobAge; + } + + public void setDeadPurgeJobAge(int deadPurgeJobAge) { + this.deadPurgeJobAge = deadPurgeJobAge; + } + + public int getPurgeFrequency() { + return purgeFrequency; + } + + public void setPurgeFrequency(int purgeFrequency) { + this.purgeFrequency = purgeFrequency; + } + + public int getFatalFailureCount() { + return this.fatalFailureCount; + } + + public void setFatalFailureCount(int fatalFailureCount) { + this.fatalFailureCount = fatalFailureCount; + } + + public void setPurgeEnabled(boolean purgeEnabled) { + this.purgeEnabled = purgeEnabled; + } + + public boolean getPurgeEnabled() { + return purgeEnabled; + } +} diff --git a/rpms/awips2.edex/deploy.builder/build.sh b/rpms/awips2.edex/deploy.builder/build.sh old mode 100755 new mode 100644