Issue #2726: Add hooks to stop archiving processes during shutdown.

Change-Id: I16a219d8300eae431a8baaf27bcb00707908c67a

Former-commit-id: 75ec73e02885fe0f646bc8961f3ea876597abab9
This commit is contained in:
Richard Peter 2014-04-24 17:52:52 -05:00
parent a511f8d4e7
commit 37342cba87
27 changed files with 289 additions and 192 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Ccfp Plug-in
Bundle-SymbolicName: com.raytheon.uf.viz.ccfp;singleton:=true
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: RAYTHEON
Require-Bundle: com.raytheon.uf.viz.core
Bundle-RequiredExecutionEnvironment: JavaSE-1.6

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Aviation Plug-in
Bundle-SymbolicName: com.raytheon.viz.aviation;singleton:=true
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Activator: com.raytheon.viz.aviation.activator.Activator
Bundle-Vendor: Raytheon
Require-Bundle: org.eclipse.ui,

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: JOGL Win64 Specific Fragment
Bundle-SymbolicName: javax.media.opengl.win64
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Fragment-Host: javax.media.opengl;bundle-version="1.1.1"
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Eclipse-PlatformFilter: (& (osgi.os=win32) (osgi.arch=x86_64))

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Windows 64-bit Jep Library
Bundle-SymbolicName: org.jep.win64
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Fragment-Host: org.jep;bundle-version="2.3.0"
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Eclipse-PlatformFilter: (& (osgi.os=win32) (osgi.arch=x86_64))

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: CCFP common Plug-in
Bundle-SymbolicName: com.raytheon.uf.common.dataplugin.ccfp
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Eclipse-RegisterBuddy: com.raytheon.uf.common.serialization
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Common Http
Bundle-SymbolicName: com.raytheon.uf.common.http
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Export-Package: com.raytheon.uf.common.http,

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Localization Plug-in
Bundle-SymbolicName: com.raytheon.uf.common.localization
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: Raytheon
Require-Bundle: org.apache.commons.lang,
com.raytheon.uf.common.serialization,

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: NetCDF Bufr
Bundle-SymbolicName: com.raytheon.uf.common.nc.bufr
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Require-Bundle: ucar.nc2,

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Utility Plug-in
Bundle-SymbolicName: com.raytheon.uf.common.util
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Require-Bundle: org.apache.commons.beanutils;bundle-version="1.8.3",

View file

@ -59,7 +59,9 @@ import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
import com.raytheon.uf.edex.core.exception.ShutdownException;
import com.raytheon.uf.edex.database.plugin.PluginDao;
import com.raytheon.uf.edex.database.processor.IDatabaseProcessor;
@ -78,6 +80,7 @@ import com.raytheon.uf.edex.database.processor.IDatabaseProcessor;
* Jan 23, 2014 2555 rjpeter Updated to be a row at a time using ScrollableResults.
* Feb 04, 2014 2770 rferrel The dumpPdos now dumps all PluginDataObjects.
* Feb 12, 2014 2784 rjpeter Update logging for dup elim scenarios.
* Apr 23, 2014 2726 rjpeter Add shutdown checks to allow for timely shutdown.
* </pre>
*
* @author rjpeter
@ -143,7 +146,7 @@ public class DatabaseArchiveProcessor<T extends PersistableDataObject<?>>
* .util.List)
*/
@Override
public boolean process(T object) {
public boolean process(T object) throws ShutdownException {
if (object != null) {
if (pdosByFile == null) {
pdosByFile = new HashMap<String, List<PersistableDataObject<?>>>(
@ -191,7 +194,7 @@ public class DatabaseArchiveProcessor<T extends PersistableDataObject<?>>
* archives any associated hdf5 files.
*/
@Override
public void finish() {
public void finish() throws ShutdownException {
if (entriesInMemory > 0) {
try {
savePdoMap(pdosByFile);
@ -228,6 +231,7 @@ public class DatabaseArchiveProcessor<T extends PersistableDataObject<?>>
}
for (String dataStoreFile : datastoreFilesToArchive) {
EDEXUtil.checkShuttingDown();
IDataStore ds = DataStoreFactory.getDataStore(new File(FileUtil
.join(pluginName, dataStoreFile)));
// all dataStore files should end with .h5
@ -346,12 +350,13 @@ public class DatabaseArchiveProcessor<T extends PersistableDataObject<?>>
* @throws IOException
*/
protected void savePdoMap(Map<String, List<PersistableDataObject<?>>> pdoMap)
throws SerializationException, IOException {
throws SerializationException, IOException, ShutdownException {
StringBuilder baseDir = new StringBuilder(160);
Set<Object> identifierSet = null;
for (Map.Entry<String, List<PersistableDataObject<?>>> entry : pdoMap
.entrySet()) {
EDEXUtil.checkShuttingDown();
baseDir.setLength(0);
baseDir.append(archivePath).append(File.separator)
.append(pluginName).append(File.separator)
@ -421,10 +426,11 @@ public class DatabaseArchiveProcessor<T extends PersistableDataObject<?>>
protected List<PersistableDataObject<?>> dupElimPreviousFiles(
SortedMap<Integer, File> fileMap,
List<PersistableDataObject<?>> pdos, Set<Object> identifierSet)
throws IOException, SerializationException {
throws IOException, SerializationException, ShutdownException {
if (!fileMap.isEmpty()) {
Iterator<File> fileIter = fileMap.values().iterator();
while (fileIter.hasNext()) {
EDEXUtil.checkShuttingDown();
File dataFile = fileIter.next();
int dupElimUntil = Integer.MAX_VALUE;
FileStatus prevFileStatus = filesCreatedThisSession

View file

@ -38,6 +38,7 @@ import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
@ -67,6 +68,7 @@ import com.raytheon.uf.edex.database.plugin.PluginFactory;
* Dec 13, 2013 2555 rjpeter Refactored logic into DatabaseArchiveProcessor.
* Feb 12, 2014 2784 rjpeter Fixed clusterLock to not update the time by default.
* Apr 01, 2014 2862 rferrel Add exclusive lock at plug-in level.
* Apr 23, 2014 2726 rjpeter Added shutdown hook for quicker shutdown while archiver is running.
* </pre>
*
* @author rjpeter
@ -92,7 +94,7 @@ public class DatabaseArchiver implements IPluginArchiver {
private static final long MIN_DURATION_MILLIS = 30 * TimeUtil.MILLIS_PER_MINUTE;
/** Maximum time increment to archive, note based off of insertTime. */
private static final long MAX_DURATION_MILLIS = 120 * TimeUtil.MILLIS_PER_MINUTE;
private static final long MAX_DURATION_MILLIS = 60 * TimeUtil.MILLIS_PER_MINUTE;
/** Default batch size for database queries */
private static final Integer defaultBatchSize = 10000;
@ -210,6 +212,10 @@ public class DatabaseArchiver implements IPluginArchiver {
}
public void archivePluginData(String pluginName, String archivePath) {
if (EDEXUtil.isShuttingDown()) {
return;
}
File archiveDir = new File(archivePath);
File pluginDir = new File(archiveDir, pluginName);
ClusterTask ctPlugin = getWriteLock(pluginDir.getAbsolutePath());
@ -281,8 +287,8 @@ public class DatabaseArchiver implements IPluginArchiver {
processor.setDebugArchiver(debugArchiver);
processor.setBatchSize(batchSize.intValue());
while ((startTime != null) && (endTime != null)
&& !processor.isFailed()) {
while (!EDEXUtil.isShuttingDown() && (startTime != null)
&& (endTime != null) && !processor.isFailed()) {
statusHandler.info(pluginName + ": Checking for records from "
+ TimeUtil.formatDate(startTime) + " to "
+ TimeUtil.formatDate(endTime));

View file

@ -43,6 +43,8 @@ import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.exception.ShutdownException;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
@ -61,7 +63,7 @@ import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler.LockType;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 01, 2014 2862 rferrel Initial creation
*
* Apr 24, 2014 2726 rjpeter Added shutdown cancel
* </pre>
*
* @author rferrel
@ -107,7 +109,8 @@ public class ArchivePurgeManager {
* @param archive
* @return purgeCount
*/
public int purgeExpiredFromArchive(ArchiveConfig archive) {
public int purgeExpiredFromArchive(ArchiveConfig archive)
throws ShutdownException {
String archiveRootDirPath = archive.getRootDir();
File archiveRootDir = new File(archiveRootDirPath);
@ -332,7 +335,9 @@ public class ArchivePurgeManager {
private int purgeDir(File dir, IOFileFilter defaultTimeFilter,
Calendar minPurgeTime, Calendar extPurgeTime,
CategoryFileDateHelper helper, CategoryConfig category,
ClusterTask ct) {
ClusterTask ct) throws ShutdownException {
EDEXUtil.checkShuttingDown();
int purgeCount = 0;
File[] dirFiles = dir.listFiles();
@ -342,6 +347,7 @@ public class ArchivePurgeManager {
}
for (File file : dirFiles) {
EDEXUtil.checkShuttingDown();
updateLockTime(ct);
if (!file.isHidden()) {
@ -408,13 +414,18 @@ public class ArchivePurgeManager {
* @param fileDataFilter
* @return purgeCount
*/
private int purgeDir(File dir, IOFileFilter fileDataFilter) {
private int purgeDir(File dir, IOFileFilter fileDataFilter)
throws ShutdownException {
EDEXUtil.checkShuttingDown();
int purgeCount = 0;
File[] dirFiles = dir.listFiles();
if (dirFiles == null) {
sendPurgeMessage();
} else {
for (File file : dirFiles) {
EDEXUtil.checkShuttingDown();
if (!file.isHidden()) {
if (file.isDirectory()) {
purgeCount += purgeDir(file, fileDataFilter);

View file

@ -27,6 +27,7 @@ import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.core.exception.ShutdownException;
/**
* Purge task to purge archived data based on configured expiration.
@ -67,29 +68,37 @@ public class ArchivePurger {
ITimer timer = TimeUtil.getTimer();
timer.start();
statusHandler.info("Archive Purge started.");
ArchivePurgeManager manager = ArchivePurgeManager.getInstance();
manager.reset();
Collection<ArchiveConfig> archives = manager.getArchives();
for (ArchiveConfig archive : archives) {
ITimer archiveTimer = TimeUtil.getTimer();
archiveTimer.start();
int purgeCount = manager.purgeExpiredFromArchive(archive);
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
StringBuilder sb = new StringBuilder(archive.getName());
sb.append("::Archive Purged ");
sb.append(purgeCount);
sb.append(" file");
if (purgeCount != 1) {
sb.append("s");
try {
ArchivePurgeManager manager = ArchivePurgeManager.getInstance();
manager.reset();
Collection<ArchiveConfig> archives = manager.getArchives();
for (ArchiveConfig archive : archives) {
ITimer archiveTimer = TimeUtil.getTimer();
archiveTimer.start();
int purgeCount = manager.purgeExpiredFromArchive(archive);
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
StringBuilder sb = new StringBuilder(archive.getName());
sb.append("::Archive Purged ");
sb.append(purgeCount);
sb.append(" file");
if (purgeCount != 1) {
sb.append("s");
}
sb.append(" in ")
.append(TimeUtil.prettyDuration(archiveTimer
.getElapsedTime())).append(".");
statusHandler.info(sb.toString());
}
sb.append(" in ")
.append(TimeUtil.prettyDuration(archiveTimer
.getElapsedTime())).append(".");
statusHandler.info(sb.toString());
}
statusHandler.info("Archive Purge finished. Time to run: "
+ TimeUtil.prettyDuration(timer.getElapsedTime()));
} catch (ShutdownException e) {
statusHandler
.info("Aborting Purge due to EDEX shutdown being initiated. Time to run: "
+ TimeUtil.prettyDuration(timer
.getElapsedTime()));
}
statusHandler.info("Archive Purge finished. Time to run: "
+ TimeUtil.prettyDuration(timer.getElapsedTime()));
} else {
statusHandler.info("Archive Purge disabled, exiting");
}

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Auth Plug-in
Bundle-SymbolicName: com.raytheon.uf.edex.auth
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Require-Bundle: com.raytheon.uf.common.serialization;bundle-version="1.11.31",

View file

@ -84,20 +84,6 @@
version="0.0.0"
unpack="false"/>
<plugin
id="com.raytheon.uf.common.management"
download-size="0"
install-size="0"
version="0.0.0"
unpack="false"/>
<plugin
id="com.raytheon.uf.edex.management"
download-size="0"
install-size="0"
version="0.0.0"
unpack="false"/>
<plugin
id="com.raytheon.uf.common.useradmin"
download-size="0"

View file

@ -2,11 +2,12 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Core
Bundle-SymbolicName: com.raytheon.uf.edex.core
Bundle-Version: 1.12.1174.qualifier
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Export-Package: com.raytheon.uf.edex.core,
com.raytheon.uf.edex.core.dataplugin,
com.raytheon.uf.edex.core.exception,
com.raytheon.uf.edex.core.hdf5,
com.raytheon.uf.edex.core.modes,
com.raytheon.uf.edex.core.props

View file

@ -40,6 +40,7 @@ import com.raytheon.uf.common.message.StatusMessage;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.core.exception.ShutdownException;
import com.raytheon.uf.edex.core.props.EnvProperties;
import com.raytheon.uf.edex.core.props.PropertiesFactory;
@ -82,6 +83,17 @@ public class EDEXUtil implements ApplicationContextAware {
private static final Object waiter = new Object();
private static volatile boolean shuttingDown = false;
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shuttingDown = true;
}
});
}
@Override
public void setApplicationContext(ApplicationContext context)
throws BeansException {
@ -160,6 +172,26 @@ public class EDEXUtil implements ApplicationContextAware {
return CONTEXT.containsBean(name);
}
/**
* True if shutdown has been initiated, false otherwise.
*
* @return
*/
public static boolean isShuttingDown() {
return shuttingDown;
}
/**
* If EDEX is shutting down throws a ShutdownException
*
* @throws ShutdownException
*/
public static void checkShuttingDown() throws ShutdownException {
if (shuttingDown) {
throw new ShutdownException();
}
}
/**
* Retrieve a URL to a new file with a UUID name
*

View file

@ -107,16 +107,18 @@ public abstract class EdexTimerBasedThread implements IContextStateProcessor {
"Error occurred during processing", e);
}
try {
/*
* use waiter to allow shutdown to wake thread for immediate
* shutdown
*/
synchronized (threads) {
threads.wait(threadSleepInterval);
if (running) {
try {
/*
* use waiter to allow shutdown to wake thread for
* immediate shutdown
*/
synchronized (threads) {
threads.wait(threadSleepInterval);
}
} catch (InterruptedException e) {
// ignore
}
} catch (InterruptedException e) {
// ignore
}
}
} finally {

View file

@ -0,0 +1,71 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.core.exception;
import com.raytheon.uf.edex.core.EdexException;
/**
* Exception thrown during shutdown to allow for easy restart of transacted
* tasks.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 23, 2014 2726 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class ShutdownException extends EdexException {
/**
* Default serial verion UID
*/
private static final long serialVersionUID = 1L;
public ShutdownException() {
super("Aborting process, EDEX shutting down");
}
/**
* Create a Shutdown Exception instance from only a message
*
* @param message
*/
public ShutdownException(String message) {
super(message);
}
/**
* Create a Shutdown Exception instance from both a message and a cause
*
* @param message
* @param cause
*/
public ShutdownException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -99,7 +99,8 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
* Apr 15, 2013 1868 bsteffen Rewrite mergeAll in PluginDao.
* Nov 08, 2013 2361 njensen Changed method signature of saveOrUpdate to take Objects, not PersistableDataObjects
* Dec 13, 2013 2555 rjpeter Added processByCriteria and fixed Generics warnings.
* Jan 23, 2014 2555 rjpeter Updated processByCriteriato be a row at a time using ScrollableResults.
* Jan 23, 2014 2555 rjpeter Updated processByCriteria to be a row at a time using ScrollableResults.
* Apr 23, 2014 2726 rjpeter Updated processByCriteria to throw exceptions back up to caller.
* </pre>
*
* @author bphillip
@ -494,24 +495,34 @@ public class CoreDao extends HibernateDaoSupport {
.scroll(ScrollMode.FORWARD_ONLY);
boolean continueProcessing = true;
while (rs.next() && continueProcessing) {
Object[] row = rs.get();
if (row.length > 0) {
continueProcessing = processor
.process((T) row[0]);
}
count++;
if ((count % batchSize) == 0) {
getSession().clear();
try {
while (rs.next() && continueProcessing) {
Object[] row = rs.get();
if (row.length > 0) {
continueProcessing = processor
.process((T) row[0]);
}
count++;
if ((count % batchSize) == 0) {
getSession().clear();
}
}
processor.finish();
} catch (Exception e) {
/*
* Only way to propogate the error to the caller
* is to throw a runtime exception
*/
throw new RuntimeException(
"Error occurred during processing", e);
}
processor.finish();
return count;
}
});
} catch (TransactionException e) {
throw new DataAccessLayerException("Transaction failed", e);
} catch (Exception e) {
throw new DataAccessLayerException(
"Error occurred during processing", e);
}
return rowsProcessed;

View file

@ -44,12 +44,12 @@ public interface IDatabaseProcessor<T> {
* @param row
* @return True if should continue processing, false otherwise.
*/
public boolean process(T row);
public boolean process(T row) throws Exception;
/**
* Perform any post processing if necessary.
*/
public void finish();
public void finish() throws Exception;
/**
* Get the batch size of the query.

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Camel Plug-in
Bundle-SymbolicName: com.raytheon.uf.edex.esb.camel
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: Raytheon
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Require-Bundle: org.apache.camel;bundle-version="1.0.0",

View file

@ -155,7 +155,7 @@ public class ContextManager implements ApplicationContextAware,
.addAll(ContextDependencyMapping.DEPENDENCY_ENDPOINT_TYPES);
internalEndpointTypes.add("timer");
internalEndpointTypes.add("quartz");
internalEndpointTypes.add("clusteredquartz");
internalEndpointTypes.add("direct");
}
/**
@ -343,117 +343,18 @@ public class ContextManager implements ApplicationContextAware,
.info("Spring Context not set. Start up never completed, cannot orderly shutdown");
}
statusHandler.info("Context Manager stopping routes");
statusHandler.info("Context Manager stopping contexts");
try {
/*
* begin immediate shutdown of routes that are not an internal
* type
*/
LinkedList<Route> routesToStop = new LinkedList<Route>();
ContextData ctxData = getContextData();
List<CamelContext> contexts = ctxData.getContexts();
List<Future<Pair<CamelContext, Boolean>>> callbacks = new LinkedList<Future<Pair<CamelContext, Boolean>>>();
for (final CamelContext context : contexts) {
/*
* group routes by context due to sync lock at context level
* for stopping a route
*/
List<Route> routes = context.getRoutes();
if ((routes != null) && (routes.size() > 0)) {
for (Route route : routes) {
String uri = route.getEndpoint().getEndpointUri();
Pair<String, String> typeAndName = ContextData
.getEndpointTypeAndName(uri);
String type = typeAndName.getFirst();
if (!internalEndpointTypes.contains(type)) {
routesToStop.add(route);
}
}
}
if (routesToStop.size() > 0) {
final IContextStateManager stateMgr = getStateManager(context);
final List<Route> tmp = routesToStop;
callbacks
.add(service
.submit(new Callable<Pair<CamelContext, Boolean>>() {
@Override
public Pair<CamelContext, Boolean> call()
throws Exception {
boolean rval = true;
for (Route route : tmp) {
try {
statusHandler.info("Stopping route ["
+ route.getId()
+ "]");
rval &= stateMgr
.stopRoute(route);
} catch (Exception e) {
statusHandler.error(
"Error occurred closing route: "
+ route.getId(),
e);
}
}
return new Pair<CamelContext, Boolean>(
context, rval);
}
}));
routesToStop = new LinkedList<Route>();
}
callbacks.add(service.submit(new StopContext(context)));
}
List<CamelContext> failures = waitForCallbacks(callbacks,
"Waiting for external routes to shutdown: ", 1000);
for (CamelContext failure : failures) {
statusHandler.error("Context [" + failure.getName()
+ "] has routes that failed to stop");
}
statusHandler.info("Shutting down contexts");
for (final CamelContext context : contexts) {
final IContextStateManager stateManager = getStateManager(context);
if (stateManager.isContextStoppable(context)) {
callbacks
.add(service
.submit(new Callable<Pair<CamelContext, Boolean>>() {
@Override
public Pair<CamelContext, Boolean> call()
throws Exception {
boolean rval = false;
try {
statusHandler.info("Stopping context ["
+ context.getName()
+ "]");
rval = stateManager
.stopContext(context);
if (!rval) {
statusHandler.error("Context ["
+ context
.getName()
+ "] failed to stop");
}
} catch (Throwable e) {
statusHandler.fatal(
"Error occurred stopping context: "
+ context
.getName(),
e);
}
return new Pair<CamelContext, Boolean>(
context, rval);
}
}));
}
}
failures = waitForCallbacks(callbacks,
"Waiting for contexts to shutdown: ", 1000);
for (CamelContext failure : failures) {
@ -461,11 +362,72 @@ public class ContextManager implements ApplicationContextAware,
+ "] had a failure trying to stop");
}
} catch (Throwable e) {
statusHandler.fatal("Error occurred during shutdown", e);
statusHandler.error("Error occurred during shutdown", e);
}
}
}
/**
* Private Callable for stopping a context. If context not immediately
* stoppable will instead shutdown external routes first.
*/
private class StopContext implements Callable<Pair<CamelContext, Boolean>> {
final CamelContext context;
private StopContext(CamelContext context) {
this.context = context;
}
@Override
public Pair<CamelContext, Boolean> call() throws Exception {
boolean rval = false;
IContextStateManager stateManager = getStateManager(context);
if (stateManager.isContextStoppable(context)) {
try {
statusHandler.info("Stopping context [" + context.getName()
+ "]");
rval = stateManager.stopContext(context);
if (!rval) {
statusHandler.error("Context [" + context.getName()
+ "] failed to stop");
}
} catch (Throwable e) {
statusHandler.fatal("Error occurred stopping context: "
+ context.getName(), e);
}
} else {
/*
* context not immediately stoppable, begin shutting down
* external routes instead
*/
List<Route> routes = context.getRoutes();
rval = true;
for (Route route : routes) {
String uri = route.getEndpoint().getEndpointUri();
Pair<String, String> typeAndName = ContextData
.getEndpointTypeAndName(uri);
String type = typeAndName.getFirst();
if (!internalEndpointTypes.contains(type)) {
try {
statusHandler.info("Stopping route ["
+ route.getId() + "]");
rval &= stateManager.stopRoute(route);
} catch (Exception e) {
statusHandler.error(
"Error occurred Stopping route: "
+ route.getId(), e);
}
}
}
}
return new Pair<CamelContext, Boolean>(context, rval);
}
}
/**
* Waits for all callbacks to finish printing a periodic message with number
* of remaining callbacks. Returns a list of contexts that had a failure

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Bufrobs
Bundle-SymbolicName: com.raytheon.uf.edex.plugin.bufrobs
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Require-Bundle: com.raytheon.uf.common.nc.bufr,

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Ccfp Plug-in
Bundle-SymbolicName: com.raytheon.uf.edex.plugin.ccfp
Bundle-Version: 1.14.0
Bundle-Version: 1.14.0.qualifier
Eclipse-RegisterBuddy: com.raytheon.uf.common.serialization, com.raytheon.edex.common
Bundle-Vendor: RAYTHEON
Require-Bundle: com.raytheon.edex.common,

View file

@ -72,11 +72,13 @@ public class ModelSoundingPersistenceManager implements IContextStateProcessor {
public void run() {
run = true;
while (run) {
// continue as long as there is data in the map
while (run || !containerMap.isEmpty()) {
try {
ModelSoundingStorageContainer container = null;
synchronized (containerMap) {
while (containerMap.isEmpty() && run) {
while (run && containerMap.isEmpty()) {
try {
containerMap.wait();
} catch (InterruptedException e) {
@ -201,9 +203,8 @@ public class ModelSoundingPersistenceManager implements IContextStateProcessor {
@Override
public void preStop() {
if (run) {
run = false;
synchronized (containerMap) {
run = false;
containerMap.notifyAll();
while (!shutdown) {

View file

@ -117,8 +117,7 @@ public class StatsDao extends SessionManagedDao<Integer, StatsRecord> {
sess = template.getSessionFactory().openStatelessSession();
// vacuum can't run within a transaction, hack to allow vacuum to
// run from within hibernate
Query query = sess
.createSQLQuery("rollback; VACUUM ANALYZE events.stats");
Query query = sess.createSQLQuery("rollback; VACUUM events.stats");
query.executeUpdate();
statusHandler.info("stats vacuumed");
} catch (Exception e) {