diff --git a/edexOsgi/com.raytheon.uf.edex.activetable/src/com/raytheon/uf/edex/activetable/ActiveTable.java b/edexOsgi/com.raytheon.uf.edex.activetable/src/com/raytheon/uf/edex/activetable/ActiveTable.java index 8f821b2ed6..32f1b8adf6 100644 --- a/edexOsgi/com.raytheon.uf.edex.activetable/src/com/raytheon/uf/edex/activetable/ActiveTable.java +++ b/edexOsgi/com.raytheon.uf.edex.activetable/src/com/raytheon/uf/edex/activetable/ActiveTable.java @@ -61,6 +61,10 @@ import com.raytheon.uf.common.util.CollectionUtil; import com.raytheon.uf.common.util.FileUtil; import com.raytheon.uf.edex.core.EDEXUtil; import com.raytheon.uf.edex.database.DataAccessLayerException; +import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; +import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState; +import com.raytheon.uf.edex.database.cluster.ClusterTask; +import com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler; import com.raytheon.uf.edex.database.dao.CoreDao; import com.raytheon.uf.edex.database.dao.DaoConfig; import com.raytheon.uf.edex.database.query.DatabaseQuery; @@ -97,6 +101,8 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery; * in updateActiveTable() so records will * be updated correctly. * Feb 05, 2015 4099 randerso Fixed latest ETN query for year-end + * Feb 23, 2015 4127 dgilling Use cluster locking to only allow 1 active + * table write at a time. * * * @@ -111,6 +117,10 @@ public class ActiveTable { private static final Logger changeLog = Logger .getLogger("ActiveTableChange"); + private static final String ACTIVE_TABLE_LOCK_NAME = "ActiveTableWriteLock"; + + private static final long DEFAULT_LOCK_TIMEOUT = 5 * TimeUtil.MILLIS_PER_MINUTE; + private static ThreadLocal threadLocalPythonScript = new ThreadLocal() { /* @@ -285,7 +295,7 @@ public class ActiveTable { */ private void updateActiveTable(String siteId, List newRecords, float offsetSecs) { - if (newRecords.size() > 0) { + if (!newRecords.isEmpty()) { ActiveTableMode mode = ActiveTableMode.PRACTICE; if (newRecords.get(0) instanceof OperationalActiveTableRecord) { mode = ActiveTableMode.OPERATIONAL; @@ -296,36 +306,67 @@ public class ActiveTable { IPerformanceStatusHandler perfStat = PerformanceStatus .getHandler("ActiveTable"); ITimer timer = TimeUtil.getTimer(); - timer.start(); - List activeTable = getActiveTable(issueSiteId, mode); - timer.stop(); - perfStat.logDuration("getActiveTable", timer.getElapsedTime()); + MergeResult result = null; + ClusterTask writeLock = null; + try { + boolean logFirst = true; + timer.start(); + do { + if (logFirst) { + statusHandler + .info("updateActiveTable() waiting on lock [" + + ACTIVE_TABLE_LOCK_NAME + ":" + + mode.toString() + "]."); + logFirst = false; + } + writeLock = ClusterLockUtils.lock(ACTIVE_TABLE_LOCK_NAME, + mode.toString(), new CurrentTimeClusterLockHandler( + DEFAULT_LOCK_TIMEOUT, false), true); + } while (!writeLock.getLockState().equals(LockState.SUCCESSFUL)); + statusHandler + .info("updateActiveTable() obtained lock [" + + ACTIVE_TABLE_LOCK_NAME + ":" + + mode.toString() + "]."); + timer.stop(); + perfStat.logDuration("getLock", timer.getElapsedTime()); - // get decoder sites to see if we need to backup active table - Set decoderSites = getDecoderSites(siteId); - - // if any new record is from one of the decoder sites - // we need to queue a backup - for (ActiveTableRecord rec : newRecords) { - if (decoderSites.contains(rec.getOfficeid())) { - ActiveTableBackup.queue(mode, activeTable); - break; + timer.reset(); + timer.start(); + List activeTable = getActiveTable( + issueSiteId, mode); + timer.stop(); + perfStat.logDuration("getActiveTable", timer.getElapsedTime()); + // get decoder sites to see if we need to backup active table + Set decoderSites = getDecoderSites(siteId); + // if any new record is from one of the decoder sites + // we need to queue a backup + for (ActiveTableRecord rec : newRecords) { + if (decoderSites.contains(rec.getOfficeid())) { + ActiveTableBackup.queue(mode, activeTable); + break; + } + } + timer.reset(); + timer.start(); + result = filterTable(siteId, activeTable, newRecords, mode, + offsetSecs); + timer.stop(); + perfStat.logDuration("filterTable", timer.getElapsedTime()); + timer.reset(); + timer.start(); + updateTable(siteId, result, mode); + timer.stop(); + perfStat.logDuration("updateTable", timer.getElapsedTime()); + } finally { + if (writeLock != null) { + statusHandler.info("updateActiveTable() released lock [" + + ACTIVE_TABLE_LOCK_NAME + ":" + mode.toString() + + "]."); + ClusterLockUtils.unlock(writeLock, true); } } - timer.reset(); - timer.start(); - MergeResult result = filterTable(siteId, activeTable, newRecords, - mode, offsetSecs); - timer.stop(); - perfStat.logDuration("filterTable", timer.getElapsedTime()); - - timer.reset(); - timer.start(); - updateTable(siteId, result, mode); - timer.stop(); - perfStat.logDuration("updateTable", timer.getElapsedTime()); - if (result.changeList.size() > 0) { + if (!result.changeList.isEmpty()) { sendNotification(mode, result.changeList, "VTECDecoder"); } } @@ -403,54 +444,52 @@ public class ActiveTable { ActiveTableMode mode, String phensigList, String action, String etn, Calendar currentTime, boolean requestValidTimes, boolean latestEtn) { - synchronized (ActiveTable.class) { - DatabaseQuery query = null; - CoreDao dao = null; + DatabaseQuery query = null; + CoreDao dao = null; - if (mode.equals(ActiveTableMode.OPERATIONAL)) { - query = new DatabaseQuery(OperationalActiveTableRecord.class); - dao = operationalDao; - } else { - query = new DatabaseQuery(PracticeActiveTableRecord.class); - dao = practiceDao; - } - - if (phensigList != null) { - query.addQueryParam("phensig", phensigList, "in"); - } - - if (action != null) { - query.addQueryParam("act", action, "in"); - } - - if (etn != null) { - query.addQueryParam("etn", etn, "in"); - } - - if (requestValidTimes && currentTime != null) { - // Current Time - query.addQueryParam("endTime", currentTime, "greater_than"); - } - if (latestEtn && currentTime != null) { - Calendar yearStart = Calendar.getInstance(); - yearStart.set(currentTime.get(Calendar.YEAR), Calendar.JANUARY, - 1, 0, 0); - query.addQueryParam("issueTime", yearStart, "greater_than"); - query.addOrder("etn", false); - query.setMaxResults(1); - } - - query.addQueryParam("officeid", siteId, "in"); - - List result = null; - try { - result = (List) dao.queryByCriteria(query); - } catch (DataAccessLayerException e) { - statusHandler.handle(Priority.PROBLEM, - "Error querying active table for site " + siteId, e); - } - return result; + if (mode.equals(ActiveTableMode.OPERATIONAL)) { + query = new DatabaseQuery(OperationalActiveTableRecord.class); + dao = operationalDao; + } else { + query = new DatabaseQuery(PracticeActiveTableRecord.class); + dao = practiceDao; } + + if (phensigList != null) { + query.addQueryParam("phensig", phensigList, "in"); + } + + if (action != null) { + query.addQueryParam("act", action, "in"); + } + + if (etn != null) { + query.addQueryParam("etn", etn, "in"); + } + + if (requestValidTimes && currentTime != null) { + // Current Time + query.addQueryParam("endTime", currentTime, "greater_than"); + } + if (latestEtn && currentTime != null) { + Calendar yearStart = Calendar.getInstance(); + yearStart.set(currentTime.get(Calendar.YEAR), Calendar.JANUARY, 1, + 0, 0); + query.addQueryParam("issueTime", yearStart, "greater_than"); + query.addOrder("etn", false); + query.setMaxResults(1); + } + + query.addQueryParam("officeid", siteId, "in"); + + List result = null; + try { + result = (List) dao.queryByCriteria(query); + } catch (DataAccessLayerException e) { + statusHandler.handle(Priority.PROBLEM, + "Error querying active table for site " + siteId, e); + } + return result; } /** @@ -463,23 +502,12 @@ public class ActiveTable { */ private static void updateTable(String siteId, MergeResult changes, ActiveTableMode mode) { - synchronized (ActiveTable.class) { - List updated = changes.updatedList; - List purged = changes.purgedList; + List updated = changes.updatedList; + List purged = changes.purgedList; - CoreDao dao = null; - if (mode.equals(ActiveTableMode.OPERATIONAL)) { - dao = operationalDao; - } else { - dao = practiceDao; - } - for (ActiveTableRecord update : updated) { - dao.saveOrUpdate(update); - } - for (ActiveTableRecord delete : purged) { - dao.delete(delete); - } - } + CoreDao dao = (ActiveTableMode.OPERATIONAL.equals(mode)) ? operationalDao + : practiceDao; + dao.bulkSaveOrUpdateAndDelete(updated, purged); } /** @@ -550,22 +578,23 @@ public class ActiveTable { ActiveTableMode tableName, List newRecords, float timeOffset, boolean makeBackup, boolean runIngestAT, String xmlSource) throws JepException { + String scriptName = runIngestAT ? "ingestAT.py" : "MergeVTEC.py"; + IPathManager pathMgr = PathManagerFactory.getPathManager(); + LocalizationContext commonCx = pathMgr.getContext( + LocalizationType.COMMON_STATIC, LocalizationLevel.BASE); + String scriptPath = pathMgr.getFile(commonCx, + FileUtil.join(ActiveTablePyIncludeUtil.VTEC, scriptName)) + .getPath(); + String pythonIncludePath = PyUtil.buildJepIncludePath( + ActiveTablePyIncludeUtil.getCommonPythonIncludePath(), + ActiveTablePyIncludeUtil.getVtecIncludePath(siteId), + ActiveTablePyIncludeUtil.getGfeConfigIncludePath(siteId), + ActiveTablePyIncludeUtil.getIscScriptsIncludePath()); + MergeResult result = null; PythonScript script = null; + ClusterTask writeLock = null; try { - String scriptName = runIngestAT ? "ingestAT.py" : "MergeVTEC.py"; - IPathManager pathMgr = PathManagerFactory.getPathManager(); - LocalizationContext commonCx = pathMgr.getContext( - LocalizationType.COMMON_STATIC, LocalizationLevel.BASE); - String scriptPath = pathMgr.getFile(commonCx, - FileUtil.join(ActiveTablePyIncludeUtil.VTEC, scriptName)) - .getPath(); - String pythonIncludePath = PyUtil.buildJepIncludePath( - ActiveTablePyIncludeUtil.getCommonPythonIncludePath(), - ActiveTablePyIncludeUtil.getVtecIncludePath(siteId), - ActiveTablePyIncludeUtil.getGfeConfigIncludePath(siteId), - ActiveTablePyIncludeUtil.getIscScriptsIncludePath()); - try { script = new PythonScript(scriptPath, pythonIncludePath, ActiveTable.class.getClassLoader()); @@ -575,6 +604,23 @@ public class ActiveTable { throw e; } + boolean logFirst = true; + do { + if (logFirst) { + statusHandler.info("mergeRemoteTable() waiting on lock [" + + ACTIVE_TABLE_LOCK_NAME + ":" + + tableName.toString() + "]."); + logFirst = false; + } + writeLock = ClusterLockUtils.lock(ACTIVE_TABLE_LOCK_NAME, + tableName.toString(), + new CurrentTimeClusterLockHandler(DEFAULT_LOCK_TIMEOUT, + false), true); + } while (!writeLock.getLockState().equals(LockState.SUCCESSFUL)); + statusHandler.info("mergeRemoteTable() obtained lock [" + + ACTIVE_TABLE_LOCK_NAME + ":" + tableName.toString() + + "]."); + try { String site4Char = SiteMap.getInstance().getSite4LetterId( siteId); @@ -597,18 +643,26 @@ public class ActiveTable { "Error merging active table", e); throw e; } + + if (result != null) { + updateTable(siteId, result, tableName); + } } finally { + if (writeLock != null) { + statusHandler.info("mergeRemoteTable() released lock [" + + ACTIVE_TABLE_LOCK_NAME + ":" + tableName.toString() + + "]."); + ClusterLockUtils.unlock(writeLock, true); + } + if (script != null) { script.dispose(); script = null; } } - if (result != null) { - updateTable(siteId, result, tableName); - if (!result.changeList.isEmpty()) { - sendNotification(tableName, result.changeList, "MergeVTEC"); - } + if ((result != null) && (!result.changeList.isEmpty())) { + sendNotification(tableName, result.changeList, "MergeVTEC"); } } diff --git a/edexOsgi/com.raytheon.uf.edex.database/src/com/raytheon/uf/edex/database/dao/CoreDao.java b/edexOsgi/com.raytheon.uf.edex.database/src/com/raytheon/uf/edex/database/dao/CoreDao.java index 943b272171..dde8965879 100644 --- a/edexOsgi/com.raytheon.uf.edex.database/src/com/raytheon/uf/edex/database/dao/CoreDao.java +++ b/edexOsgi/com.raytheon.uf.edex.database/src/com/raytheon/uf/edex/database/dao/CoreDao.java @@ -101,6 +101,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery; * Dec 13, 2013 2555 rjpeter Added processByCriteria and fixed Generics warnings. * Jan 23, 2014 2555 rjpeter Updated processByCriteria to be a row at a time using ScrollableResults. * Apr 23, 2014 2726 rjpeter Updated processByCriteria to throw exceptions back up to caller. + * Feb 23, 2015 4127 dgilling Added bulkSaveOrUpdateAndDelete(). * * * @author bphillip @@ -1143,4 +1144,26 @@ public class CoreDao extends HibernateDaoSupport { return getSessionFactory().getClassMetadata(daoClass); } } + + /** + * Updates/saves a set of records and deletes a set of records in the + * database in a single transaction. + * + * @param updates + * Records to update or add. + * @param deletes + * Records to delete. + */ + public void bulkSaveOrUpdateAndDelete( + final Collection updates, + final Collection deletes) { + txTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + HibernateTemplate ht = getHibernateTemplate(); + ht.saveOrUpdateAll(updates); + ht.deleteAll(deletes); + } + }); + } }