ASM #629 - GFE: when runProcedure terminates unexpectedly locks remain in gfelocktable.

Change-Id: I5772661758a26c47019c3939ceff73ca0eb6f50e

Former-commit-id: 9db91abd8a [formerly ab0f7f0f627ab3926a219940f1267dc460e74e14]
Former-commit-id: 6897cfd7b9
This commit is contained in:
Michael Gamazaychikov 2015-03-03 09:37:39 -05:00
parent c73879c8c4
commit b6fab0df30
5 changed files with 90 additions and 166 deletions

View file

@ -235,7 +235,6 @@
<property name="provider" ref="brokerConnectionsProvider" />
</bean>
<!-- Service Backup Handlers -->
<bean id="GetPrimarySiteHandler" class="com.raytheon.uf.edex.site.handlers.GetPrimarySiteHandler"/>

View file

@ -14,5 +14,5 @@ purge.svcbu.logs.cron=0+30+0+*+*+?
purge.gfe.products.isc.cron=0+45+0+*+*+?
purge.gfe.products.atbl.cron=0+50+0+*+*+?
# Interval at which the gfe orphaned locks are cleared
# Interval at which gfe orphaned locks are cleared
clear.gfe.orphaned.locks.cron = 0+0/10+*+*+*+?

View file

@ -34,6 +34,7 @@ import org.hibernate.StatelessSession;
import org.hibernate.Transaction;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.ParmID;
//import com.raytheon.uf.common.dataplugin.gfe.db.objects.DatabaseID;
import com.raytheon.uf.common.dataplugin.gfe.server.lock.Lock;
import com.raytheon.uf.common.dataplugin.gfe.server.lock.LockTable;
import com.raytheon.uf.common.message.WsId;
@ -53,7 +54,7 @@ import com.raytheon.uf.edex.database.dao.DaoConfig;
* 04/19/13 #1949 rjpeter Normalized GFE Database.
* 06/20/13 #2127 rjpeter Set session to read only.
* 10/16/2014 3454 bphillip Upgrading to Hibernate 4
* 01/07/15 629 mgamazaychikov Add getAllLocks method.
* 03/03/15 629 mgamazaychikov Add getAllLocks method.
* </pre>
*
* @author bphillip
@ -195,8 +196,8 @@ public class GFELockDao extends CoreDao {
}
@SuppressWarnings("unchecked")
public Map<ParmID, LockTable> getAllLocks() throws DataAccessLayerException{
Map<ParmID, LockTable> lockMap = new HashMap<ParmID, LockTable>();
public List<Lock> getAllLocks(final String siteId)
throws DataAccessLayerException {
Session sess = null;
Transaction tx = null;
@ -204,24 +205,12 @@ public class GFELockDao extends CoreDao {
sess = getSession();
sess.setDefaultReadOnly(true);
tx = sess.beginTransaction();
Query query = sess
.createQuery("FROM Lock");
.createQuery("FROM Lock WHERE parmId in (SELECT id FROM ParmID WHERE dbId in (SELECT id FROM DatabaseID WHERE siteId = ?))");
query.setParameter(0, siteId);
List<Lock> locks = query.list();
tx.commit();
// populate Lock table
for (Lock lock : locks) {
WsId wid = lock.getWsId();
ParmID pid = lock.getParmId();
LockTable lockTable = lockMap.get(pid);
if (lockTable == null) {
lockTable = new LockTable(pid, new ArrayList<Lock>(), wid);
lockMap.put(pid, lockTable);
}
lockTable.addLock(lock);
}
return lockMap;
return locks;
} catch (Exception e) {
if (tx != null) {
try {
@ -230,16 +219,13 @@ public class GFELockDao extends CoreDao {
logger.error("Error occurred rolling back transaction", e1);
}
}
throw new DataAccessLayerException(
"Unable to look up locks ", e);
throw new DataAccessLayerException("Unable to look up locks for site " + siteId, e);
} finally {
if (sess != null) {
try {
sess.close();
} catch (Exception e) {
logger.error(
"Error occurred closing database session", e);
logger.error("Error occurred closing database session", e);
}
}
}

View file

@ -22,21 +22,22 @@ package com.raytheon.edex.plugin.gfe.server.lock;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import com.raytheon.edex.plugin.gfe.server.IFPServer;
import com.raytheon.edex.plugin.gfe.server.lock.LockManager;
import com.raytheon.edex.plugin.gfe.util.SendNotifications;
import com.raytheon.uf.common.dataplugin.gfe.exception.GfeException;
import com.raytheon.uf.common.dataplugin.gfe.server.lock.Lock;
import com.raytheon.uf.common.dataplugin.gfe.server.lock.LockTable;
import com.raytheon.uf.common.dataplugin.gfe.server.lock.LockTable.LockMode;
import com.raytheon.uf.common.dataplugin.gfe.server.message.ServerResponse;
import com.raytheon.uf.common.dataplugin.gfe.server.notify.GfeNotification;
import com.raytheon.uf.common.dataplugin.gfe.server.notify.LockNotification;
import com.raytheon.uf.common.dataplugin.gfe.server.request.LockRequest;
import com.raytheon.uf.common.message.WsId;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.TimeRange;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.esb.camel.jms.IBrokerConnectionsProvider;
/**
@ -50,7 +51,7 @@ import com.raytheon.uf.edex.esb.camel.jms.IBrokerConnectionsProvider;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 07, 2015 629 mgamazaychikov Initial creation
* Mar 03, 2015 629 mgamazaychikov Initial creation
*
* </pre>
*
@ -62,148 +63,90 @@ public class ClearGfeOrphanedLocks {
private static IBrokerConnectionsProvider provider;
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(ClearGfeOrphanedLocks.class);
public final String CAVE = "CAVE";
public static void setProvider(IBrokerConnectionsProvider provider) {
ClearGfeOrphanedLocks.provider = provider;
}
private Set<String> breakAllLocks(List<LockTable> lockTables,
LockManager lockMgr) {
Set<String> inactives = new HashSet<String>();
for (LockTable lockTable : lockTables) {
for (Lock lock : lockTable.getLocks()) {
TimeRange tr = lock.getTimeRange();
List<LockRequest> lreq = new ArrayList<LockRequest>();
lreq.add(new LockRequest(lock.getParmId(), tr,
private void breakLocks(Set<String> clients, List<Lock> lockList,
LockManager lockMgr, String siteId) {
boolean foundOrpanedLocks = false;
List<LockRequest> lreq = new ArrayList<LockRequest>();
StringBuilder sb = new StringBuilder();
for (Lock lock : lockList) {
String lockWsid = lock.getWsId().toString();
if (!clients.contains(lockWsid)) {
foundOrpanedLocks = true;
List<Lock> lst = new ArrayList<Lock>();
lst.add(lock);
// Inactive clients found
lreq.add(new LockRequest(lock.getParmId(), lock.getTimeRange(),
LockMode.BREAK_LOCK));
lockMgr.requestLockChange(lreq, lock.getWsId());
if (!inactives.contains(lock.getWsId().toPrettyString())
&& !inactives.contains(lock.getParmId().toString())) {
String message = " Breaking orphaned lock on "
+ lock.getParmId().toString() + " owned by "
+ lock.getWsId().toPrettyString() + ".";
inactives.add(message);
}
sb.append(" Breaking orphaned lock for site " + siteId + " on "
+ lock.getParmId().toString() + " owned by "
+ lock.getWsId().toPrettyString() + ".");
}
}
return inactives;
}
private Set<String> breakLocks(Set<String> clients,
List<LockTable> lockTables, LockManager lockMgr) {
Set<String> inactives = new HashSet<String>();
for (LockTable lockTable : lockTables) {
for (Lock lock : lockTable.getLocks()) {
String lockedWsid = lock.getWsId().toString();
for (String client : clients) {
if (!lockedWsid.equals(client)) {
// Inactive CAVE clients found - break its lock
List<LockRequest> lreq = new ArrayList<LockRequest>();
lreq.add(new LockRequest(lock.getParmId(), lock
.getTimeRange(), LockMode.BREAK_LOCK));
lockMgr.requestLockChange(lreq, lock.getWsId());
if (!inactives
.contains(lock.getWsId().toPrettyString())
&& !inactives.contains(lock.getParmId()
.toString())) {
String message = " Breaking orphaned lock on "
+ lock.getParmId().toString()
+ " owned by "
+ lock.getWsId().toPrettyString() + ".";
inactives.add(message);
}
if (foundOrpanedLocks) {
statusHandler.info(sb.toString());
WsId requestor = new WsId(null, null, "ClearGfeOrphanedLocks");
ServerResponse<List<LockTable>> sr = lockMgr.requestLockChange(
lreq, requestor);
if (sr.isOkay()) {
try {
List<LockTable> lockTables = sr.getPayload();
List<GfeNotification> notes = new ArrayList<GfeNotification>(
lockTables.size());
for (LockTable table : lockTables) {
notes.add(new LockNotification(table, siteId));
}
ServerResponse<?> notifyResponse = SendNotifications
.send(notes);
if (!notifyResponse.isOkay()) {
statusHandler.error(notifyResponse.message());
}
// send out grid update notifications
notifyResponse = SendNotifications.send(sr.getNotifications());
if (!notifyResponse.isOkay()) {
statusHandler.error(notifyResponse.message());
}
} catch (Exception e) {
statusHandler.error("Error sending lock notification", e);
}
}
} else {
statusHandler.error(sr.message());
}
return;
} else {
statusHandler.info(" No orphaned locks found for site " + siteId
+ ".");
return;
}
return inactives;
}
@SuppressWarnings("unchecked")
public void clearLocksCron() throws Exception {
Date executionTime = new Date(System.currentTimeMillis());
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String msg = "Started at " + executionTime;
statusHandler.info(msg);
}
statusHandler
.info("Started at " + new Date(System.currentTimeMillis()));
Set<String> clients = new HashSet<String>(provider.getConnections());
Set<String> inactives = new HashSet<String>();
String siteId = EDEXUtil.getEdexSite();
IFPServer ifpServer = IFPServer.getActiveServer(siteId);
if (ifpServer == null) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String msg = "No active IFPServer for site " + siteId;
statusHandler.info(msg);
return;
}
if (IFPServer.getActiveServers().size() == 0) {
statusHandler.info("No active IFPServer found.");
return;
}
LockManager lockMgr = ifpServer.getLockMgr();
List<LockTable> lockTables = (List<LockTable>) lockMgr.getAllLocks()
.getPayload();
/*
* There are no locks in the db.
*/
if (lockTables.size() == 0) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String msg = "No locks found for site " + siteId;
statusHandler.info(msg);
return;
}
}
/*
* Filter out non-CAVE clients.
*/
for (Iterator<String> iterator = clients.iterator(); iterator.hasNext();) {
String client = iterator.next();
if (!client.contains(CAVE)) {
iterator.remove();
}
}
/*
* If there are no active CAVE clients but the locks exist, they all
* must be orphaned -> break the locks.
*/
if (clients.isEmpty() && lockTables.size() > 0) {
inactives = breakAllLocks(lockTables, lockMgr);
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
StringBuilder sb = new StringBuilder();
for (String in : inactives) {
sb.append(in);
}
statusHandler.info(sb.toString());
return;
}
}
/*
* There are active CAVE clients, find orphaned locks and break the
* locks.
*/
inactives = breakLocks(clients, lockTables, lockMgr);
if (inactives.isEmpty()) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String msg = "No orphaned locks found for site " + siteId;
statusHandler.info(msg);
return;
}
} else {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
StringBuilder sb = new StringBuilder();
for (String in : inactives) {
sb.append(in);
}
statusHandler.info(sb.toString());
List<IFPServer> ifpServers = IFPServer.getActiveServers();
for (IFPServer ifps : ifpServers) {
LockManager lockMgr = ifps.getLockMgr();
String siteId = ifps.getSiteId();
List<Lock> lockList;
try {
lockList = (List<Lock>) lockMgr.getAllLocks(siteId);
// find orphaned locks and break them
breakLocks(clients, lockList, lockMgr, siteId);
return;
} catch (GfeException e) {
statusHandler.error("Error retrieving all locks", e);
}
}
}

View file

@ -68,7 +68,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
* 06/13/13 #2044 randerso Converted from singleton to instance per
* site managed by IFPServer
* 10/07/2014 #3684 randerso Restructured IFPServer start up
* 01/07/15 629 mgamazaychikov Add getAllLocks method.
* 03/03/15 629 mgamazaychikov Add getAllLocks method.
* </pre>
*
* @author bphillip
@ -909,19 +909,15 @@ public class LockManager {
}
}
public ServerResponse <?> getAllLocks() {
ServerResponse<List<LockTable>> sr = new ServerResponse<List<LockTable>>();
public List<Lock> getAllLocks(String siteId) throws GfeException {
List<Lock> lt = new ArrayList<Lock>();
try {
List<LockTable> payLoad = null;
Map<ParmID, LockTable> lockMap = dao.getAllLocks();
payLoad = new ArrayList<LockTable>(lockMap.size());
payLoad.addAll(lockMap.values());
sr.setPayload(payLoad);
lt = dao.getAllLocks(siteId);
} catch (Exception e) {
sr.addMessage("Error getting lock tables for");
sr.setPayload(new ArrayList<LockTable>(0));
throw new GfeException("Error getting lock tables for site "
+ this.siteId + ": " + e.getMessage());
}
return sr;
return lt;
}
}