Issue #4127: Add ClusterTask-backed write lock to ActiveTable.

Change-Id: I54309f369e7ef872fd102a764641569d95c62799

Former-commit-id: c564462d34bae64c3339339254d5cb6caba70fda
This commit is contained in:
David Gillingham 2015-02-24 12:39:03 -06:00
parent ebb221d935
commit def338c49f
2 changed files with 184 additions and 107 deletions

View file

@ -61,6 +61,10 @@ import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
import com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler;
import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
@ -97,6 +101,8 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
* in updateActiveTable() so records will
* be updated correctly.
* Feb 05, 2015 4099 randerso Fixed latest ETN query for year-end
* Feb 23, 2015 4127 dgilling Use cluster locking to only allow 1 active
* table write at a time.
*
* </pre>
*
@ -111,6 +117,10 @@ public class ActiveTable {
private static final Logger changeLog = Logger
.getLogger("ActiveTableChange");
private static final String ACTIVE_TABLE_LOCK_NAME = "ActiveTableWriteLock";
private static final long DEFAULT_LOCK_TIMEOUT = 5 * TimeUtil.MILLIS_PER_MINUTE;
private static ThreadLocal<PythonScript> threadLocalPythonScript = new ThreadLocal<PythonScript>() {
/*
@ -285,7 +295,7 @@ public class ActiveTable {
*/
private void updateActiveTable(String siteId,
List<ActiveTableRecord> newRecords, float offsetSecs) {
if (newRecords.size() > 0) {
if (!newRecords.isEmpty()) {
ActiveTableMode mode = ActiveTableMode.PRACTICE;
if (newRecords.get(0) instanceof OperationalActiveTableRecord) {
mode = ActiveTableMode.OPERATIONAL;
@ -296,36 +306,67 @@ public class ActiveTable {
IPerformanceStatusHandler perfStat = PerformanceStatus
.getHandler("ActiveTable");
ITimer timer = TimeUtil.getTimer();
timer.start();
List<ActiveTableRecord> activeTable = getActiveTable(issueSiteId, mode);
timer.stop();
perfStat.logDuration("getActiveTable", timer.getElapsedTime());
MergeResult result = null;
ClusterTask writeLock = null;
try {
boolean logFirst = true;
timer.start();
do {
if (logFirst) {
statusHandler
.info("updateActiveTable() waiting on lock ["
+ ACTIVE_TABLE_LOCK_NAME + ":"
+ mode.toString() + "].");
logFirst = false;
}
writeLock = ClusterLockUtils.lock(ACTIVE_TABLE_LOCK_NAME,
mode.toString(), new CurrentTimeClusterLockHandler(
DEFAULT_LOCK_TIMEOUT, false), true);
} while (!writeLock.getLockState().equals(LockState.SUCCESSFUL));
statusHandler
.info("updateActiveTable() obtained lock ["
+ ACTIVE_TABLE_LOCK_NAME + ":"
+ mode.toString() + "].");
timer.stop();
perfStat.logDuration("getLock", timer.getElapsedTime());
// get decoder sites to see if we need to backup active table
Set<String> decoderSites = getDecoderSites(siteId);
// if any new record is from one of the decoder sites
// we need to queue a backup
for (ActiveTableRecord rec : newRecords) {
if (decoderSites.contains(rec.getOfficeid())) {
ActiveTableBackup.queue(mode, activeTable);
break;
timer.reset();
timer.start();
List<ActiveTableRecord> activeTable = getActiveTable(
issueSiteId, mode);
timer.stop();
perfStat.logDuration("getActiveTable", timer.getElapsedTime());
// get decoder sites to see if we need to backup active table
Set<String> decoderSites = getDecoderSites(siteId);
// if any new record is from one of the decoder sites
// we need to queue a backup
for (ActiveTableRecord rec : newRecords) {
if (decoderSites.contains(rec.getOfficeid())) {
ActiveTableBackup.queue(mode, activeTable);
break;
}
}
timer.reset();
timer.start();
result = filterTable(siteId, activeTable, newRecords, mode,
offsetSecs);
timer.stop();
perfStat.logDuration("filterTable", timer.getElapsedTime());
timer.reset();
timer.start();
updateTable(siteId, result, mode);
timer.stop();
perfStat.logDuration("updateTable", timer.getElapsedTime());
} finally {
if (writeLock != null) {
statusHandler.info("updateActiveTable() released lock ["
+ ACTIVE_TABLE_LOCK_NAME + ":" + mode.toString()
+ "].");
ClusterLockUtils.unlock(writeLock, true);
}
}
timer.reset();
timer.start();
MergeResult result = filterTable(siteId, activeTable, newRecords,
mode, offsetSecs);
timer.stop();
perfStat.logDuration("filterTable", timer.getElapsedTime());
timer.reset();
timer.start();
updateTable(siteId, result, mode);
timer.stop();
perfStat.logDuration("updateTable", timer.getElapsedTime());
if (result.changeList.size() > 0) {
if (!result.changeList.isEmpty()) {
sendNotification(mode, result.changeList, "VTECDecoder");
}
}
@ -403,54 +444,52 @@ public class ActiveTable {
ActiveTableMode mode, String phensigList, String action,
String etn, Calendar currentTime, boolean requestValidTimes,
boolean latestEtn) {
synchronized (ActiveTable.class) {
DatabaseQuery query = null;
CoreDao dao = null;
DatabaseQuery query = null;
CoreDao dao = null;
if (mode.equals(ActiveTableMode.OPERATIONAL)) {
query = new DatabaseQuery(OperationalActiveTableRecord.class);
dao = operationalDao;
} else {
query = new DatabaseQuery(PracticeActiveTableRecord.class);
dao = practiceDao;
}
if (phensigList != null) {
query.addQueryParam("phensig", phensigList, "in");
}
if (action != null) {
query.addQueryParam("act", action, "in");
}
if (etn != null) {
query.addQueryParam("etn", etn, "in");
}
if (requestValidTimes && currentTime != null) {
// Current Time
query.addQueryParam("endTime", currentTime, "greater_than");
}
if (latestEtn && currentTime != null) {
Calendar yearStart = Calendar.getInstance();
yearStart.set(currentTime.get(Calendar.YEAR), Calendar.JANUARY,
1, 0, 0);
query.addQueryParam("issueTime", yearStart, "greater_than");
query.addOrder("etn", false);
query.setMaxResults(1);
}
query.addQueryParam("officeid", siteId, "in");
List<ActiveTableRecord> result = null;
try {
result = (List<ActiveTableRecord>) dao.queryByCriteria(query);
} catch (DataAccessLayerException e) {
statusHandler.handle(Priority.PROBLEM,
"Error querying active table for site " + siteId, e);
}
return result;
if (mode.equals(ActiveTableMode.OPERATIONAL)) {
query = new DatabaseQuery(OperationalActiveTableRecord.class);
dao = operationalDao;
} else {
query = new DatabaseQuery(PracticeActiveTableRecord.class);
dao = practiceDao;
}
if (phensigList != null) {
query.addQueryParam("phensig", phensigList, "in");
}
if (action != null) {
query.addQueryParam("act", action, "in");
}
if (etn != null) {
query.addQueryParam("etn", etn, "in");
}
if (requestValidTimes && currentTime != null) {
// Current Time
query.addQueryParam("endTime", currentTime, "greater_than");
}
if (latestEtn && currentTime != null) {
Calendar yearStart = Calendar.getInstance();
yearStart.set(currentTime.get(Calendar.YEAR), Calendar.JANUARY, 1,
0, 0);
query.addQueryParam("issueTime", yearStart, "greater_than");
query.addOrder("etn", false);
query.setMaxResults(1);
}
query.addQueryParam("officeid", siteId, "in");
List<ActiveTableRecord> result = null;
try {
result = (List<ActiveTableRecord>) dao.queryByCriteria(query);
} catch (DataAccessLayerException e) {
statusHandler.handle(Priority.PROBLEM,
"Error querying active table for site " + siteId, e);
}
return result;
}
/**
@ -463,23 +502,12 @@ public class ActiveTable {
*/
private static void updateTable(String siteId, MergeResult changes,
ActiveTableMode mode) {
synchronized (ActiveTable.class) {
List<ActiveTableRecord> updated = changes.updatedList;
List<ActiveTableRecord> purged = changes.purgedList;
List<ActiveTableRecord> updated = changes.updatedList;
List<ActiveTableRecord> purged = changes.purgedList;
CoreDao dao = null;
if (mode.equals(ActiveTableMode.OPERATIONAL)) {
dao = operationalDao;
} else {
dao = practiceDao;
}
for (ActiveTableRecord update : updated) {
dao.saveOrUpdate(update);
}
for (ActiveTableRecord delete : purged) {
dao.delete(delete);
}
}
CoreDao dao = (ActiveTableMode.OPERATIONAL.equals(mode)) ? operationalDao
: practiceDao;
dao.bulkSaveOrUpdateAndDelete(updated, purged);
}
/**
@ -550,22 +578,23 @@ public class ActiveTable {
ActiveTableMode tableName, List<ActiveTableRecord> newRecords,
float timeOffset, boolean makeBackup, boolean runIngestAT,
String xmlSource) throws JepException {
String scriptName = runIngestAT ? "ingestAT.py" : "MergeVTEC.py";
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext commonCx = pathMgr.getContext(
LocalizationType.COMMON_STATIC, LocalizationLevel.BASE);
String scriptPath = pathMgr.getFile(commonCx,
FileUtil.join(ActiveTablePyIncludeUtil.VTEC, scriptName))
.getPath();
String pythonIncludePath = PyUtil.buildJepIncludePath(
ActiveTablePyIncludeUtil.getCommonPythonIncludePath(),
ActiveTablePyIncludeUtil.getVtecIncludePath(siteId),
ActiveTablePyIncludeUtil.getGfeConfigIncludePath(siteId),
ActiveTablePyIncludeUtil.getIscScriptsIncludePath());
MergeResult result = null;
PythonScript script = null;
ClusterTask writeLock = null;
try {
String scriptName = runIngestAT ? "ingestAT.py" : "MergeVTEC.py";
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext commonCx = pathMgr.getContext(
LocalizationType.COMMON_STATIC, LocalizationLevel.BASE);
String scriptPath = pathMgr.getFile(commonCx,
FileUtil.join(ActiveTablePyIncludeUtil.VTEC, scriptName))
.getPath();
String pythonIncludePath = PyUtil.buildJepIncludePath(
ActiveTablePyIncludeUtil.getCommonPythonIncludePath(),
ActiveTablePyIncludeUtil.getVtecIncludePath(siteId),
ActiveTablePyIncludeUtil.getGfeConfigIncludePath(siteId),
ActiveTablePyIncludeUtil.getIscScriptsIncludePath());
try {
script = new PythonScript(scriptPath, pythonIncludePath,
ActiveTable.class.getClassLoader());
@ -575,6 +604,23 @@ public class ActiveTable {
throw e;
}
boolean logFirst = true;
do {
if (logFirst) {
statusHandler.info("mergeRemoteTable() waiting on lock ["
+ ACTIVE_TABLE_LOCK_NAME + ":"
+ tableName.toString() + "].");
logFirst = false;
}
writeLock = ClusterLockUtils.lock(ACTIVE_TABLE_LOCK_NAME,
tableName.toString(),
new CurrentTimeClusterLockHandler(DEFAULT_LOCK_TIMEOUT,
false), true);
} while (!writeLock.getLockState().equals(LockState.SUCCESSFUL));
statusHandler.info("mergeRemoteTable() obtained lock ["
+ ACTIVE_TABLE_LOCK_NAME + ":" + tableName.toString()
+ "].");
try {
String site4Char = SiteMap.getInstance().getSite4LetterId(
siteId);
@ -597,18 +643,26 @@ public class ActiveTable {
"Error merging active table", e);
throw e;
}
if (result != null) {
updateTable(siteId, result, tableName);
}
} finally {
if (writeLock != null) {
statusHandler.info("mergeRemoteTable() released lock ["
+ ACTIVE_TABLE_LOCK_NAME + ":" + tableName.toString()
+ "].");
ClusterLockUtils.unlock(writeLock, true);
}
if (script != null) {
script.dispose();
script = null;
}
}
if (result != null) {
updateTable(siteId, result, tableName);
if (!result.changeList.isEmpty()) {
sendNotification(tableName, result.changeList, "MergeVTEC");
}
if ((result != null) && (!result.changeList.isEmpty())) {
sendNotification(tableName, result.changeList, "MergeVTEC");
}
}

View file

@ -101,6 +101,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
* Dec 13, 2013 2555 rjpeter Added processByCriteria and fixed Generics warnings.
* Jan 23, 2014 2555 rjpeter Updated processByCriteria to be a row at a time using ScrollableResults.
* Apr 23, 2014 2726 rjpeter Updated processByCriteria to throw exceptions back up to caller.
* Feb 23, 2015 4127 dgilling Added bulkSaveOrUpdateAndDelete().
* </pre>
*
* @author bphillip
@ -1143,4 +1144,26 @@ public class CoreDao extends HibernateDaoSupport {
return getSessionFactory().getClassMetadata(daoClass);
}
}
/**
* Updates/saves a set of records and deletes a set of records in the
* database in a single transaction.
*
* @param updates
* Records to update or add.
* @param deletes
* Records to delete.
*/
public void bulkSaveOrUpdateAndDelete(
final Collection<? extends Object> updates,
final Collection<? extends Object> deletes) {
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
HibernateTemplate ht = getHibernateTemplate();
ht.saveOrUpdateAll(updates);
ht.deleteAll(deletes);
}
});
}
}