Issue #2862 The DatabaseArchiver and ArhivePurger now have write cluster locks.
Change-Id: I067b6037d5b044229f2c1e5acdd7ec5f7a372908 Former-commit-id:76883956dd
[formerly032ee7ef77
] [formerly226b1be015
] [formerly76883956dd
[formerly032ee7ef77
] [formerly226b1be015
] [formerlyc86d738d5b
[formerly226b1be015
[formerly 09e4f116a482f2a197c01cdec3609e10e1d2285d]]]] Former-commit-id:c86d738d5b
Former-commit-id:e6938244b4
[formerlyeec1d22987
] [formerly 553d6ed782fb095faff61739f4995edf91a39016 [formerly0d85ae72e0
]] Former-commit-id: 60458c98c7fe869e82691802e91bc0911c1ddf00 [formerly388ea1349a
] Former-commit-id:6508e30611
This commit is contained in:
parent
d10f5398f1
commit
d80872ab60
10 changed files with 884 additions and 332 deletions
|
@ -90,6 +90,7 @@ import com.raytheon.uf.common.util.FileUtil;
|
||||||
* Dec 04, 2013 2603 rferrel Changes to improve archive purging.
|
* Dec 04, 2013 2603 rferrel Changes to improve archive purging.
|
||||||
* Dec 17, 2013 2603 rjpeter Fix directory purging.
|
* Dec 17, 2013 2603 rjpeter Fix directory purging.
|
||||||
* Mar 27, 2014 2790 rferrel Detect problems when several purges running at the same time.
|
* Mar 27, 2014 2790 rferrel Detect problems when several purges running at the same time.
|
||||||
|
* Apr 01, 2014 2862 rferrel Moved purge only routines to ArchivePurgeManager.
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author rferrel
|
* @author rferrel
|
||||||
|
@ -113,9 +114,6 @@ public class ArchiveConfigManager {
|
||||||
/** Mapping of archive configuration data keyed to the name. */
|
/** Mapping of archive configuration data keyed to the name. */
|
||||||
private final Map<String, ArchiveConfig> archiveMap = new HashMap<String, ArchiveConfig>();
|
private final Map<String, ArchiveConfig> archiveMap = new HashMap<String, ArchiveConfig>();
|
||||||
|
|
||||||
/** Limit number of times message is sent. */
|
|
||||||
private boolean sentPurgeMessage = false;
|
|
||||||
|
|
||||||
/** Get the singleton. */
|
/** Get the singleton. */
|
||||||
public final static ArchiveConfigManager getInstance() {
|
public final static ArchiveConfigManager getInstance() {
|
||||||
return instance;
|
return instance;
|
||||||
|
@ -305,298 +303,6 @@ public class ArchiveConfigManager {
|
||||||
return new File(newArchivePathString);
|
return new File(newArchivePathString);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Purge the Files that fall outside of the time frame constraints for the
|
|
||||||
* archive. This will always leave the archive's top level directories even
|
|
||||||
* when they are empty.
|
|
||||||
*
|
|
||||||
* @param archive
|
|
||||||
* @return purgeCount
|
|
||||||
*/
|
|
||||||
public int purgeExpiredFromArchive(ArchiveConfig archive) {
|
|
||||||
String archiveRootDirPath = archive.getRootDir();
|
|
||||||
File archiveRootDir = new File(archiveRootDirPath);
|
|
||||||
int purgeCount = 0;
|
|
||||||
sentPurgeMessage = false;
|
|
||||||
|
|
||||||
if (!archiveRootDir.isDirectory()) {
|
|
||||||
statusHandler.error(archiveRootDir.getAbsolutePath()
|
|
||||||
+ " not a directory.");
|
|
||||||
return purgeCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
|
||||||
statusHandler.info("Purging directory: \""
|
|
||||||
+ archiveRootDir.getAbsolutePath() + "\".");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
|
||||||
String message = String.format(
|
|
||||||
"Start setup of category date helpers for archive: %s.",
|
|
||||||
archive.getName());
|
|
||||||
statusHandler.debug(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<CategoryConfig, CategoryFileDateHelper> helperMap = new HashMap<CategoryConfig, CategoryFileDateHelper>();
|
|
||||||
for (CategoryConfig category : archive.getCategoryList()) {
|
|
||||||
CategoryFileDateHelper helper = new CategoryFileDateHelper(
|
|
||||||
archiveRootDirPath, category);
|
|
||||||
helperMap.put(category, helper);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
|
||||||
String message = String.format(
|
|
||||||
"End setup of category date helpers for archive: %s.",
|
|
||||||
archive.getName());
|
|
||||||
statusHandler.debug(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Calendar minPurgeTime = calculateExpiration(archive, null);
|
|
||||||
|
|
||||||
IOFileFilter defaultTimeFilter = new IOFileFilter() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean accept(File dir, String name) {
|
|
||||||
File file = new File(dir, name);
|
|
||||||
return accept(file);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean accept(File file) {
|
|
||||||
Calendar time = TimeUtil.newGmtCalendar();
|
|
||||||
time.setTimeInMillis(file.lastModified());
|
|
||||||
return time.compareTo(minPurgeTime) < 0;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
File[] topLevelFiles = archiveRootDir.listFiles();
|
|
||||||
for (File topFile : topLevelFiles) {
|
|
||||||
// In top level directory ignore all hidden files and directories.
|
|
||||||
if (!topFile.isHidden()) {
|
|
||||||
if (topFile.isDirectory()) {
|
|
||||||
boolean isInCategory = false;
|
|
||||||
for (CategoryConfig category : archive.getCategoryList()) {
|
|
||||||
CategoryFileDateHelper helper = helperMap.get(category);
|
|
||||||
|
|
||||||
if (helper.isCategoryDirectory(topFile.getName())) {
|
|
||||||
isInCategory = true;
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
|
||||||
String message = String
|
|
||||||
.format("Start purge of category %s - %s, directory \"%s\".",
|
|
||||||
archive.getName(),
|
|
||||||
category.getName(),
|
|
||||||
topFile.getAbsolutePath());
|
|
||||||
statusHandler.info(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Calendar extPurgeTime = calculateExpiration(
|
|
||||||
archive, category);
|
|
||||||
int pc = purgeDir(topFile, defaultTimeFilter,
|
|
||||||
minPurgeTime, extPurgeTime, helper,
|
|
||||||
category);
|
|
||||||
purgeCount += pc;
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
|
||||||
String message = String
|
|
||||||
.format("End purge of category %s - %s, directory \"%s\", deleted %d files and directories.",
|
|
||||||
archive.getName(),
|
|
||||||
category.getName(),
|
|
||||||
topFile.getAbsolutePath(), pc);
|
|
||||||
statusHandler.info(message);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (isInCategory == false) {
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
|
||||||
String message = String.format(
|
|
||||||
"Start purge of directory: \"%s\".",
|
|
||||||
topFile.getAbsolutePath());
|
|
||||||
statusHandler.info(message);
|
|
||||||
}
|
|
||||||
int pc = purgeDir(topFile, defaultTimeFilter);
|
|
||||||
purgeCount += pc;
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
|
||||||
String message = String
|
|
||||||
.format("End purge of directory: \"%s\", deleted %d files and directories.",
|
|
||||||
topFile.getAbsolutePath(), pc);
|
|
||||||
statusHandler.info(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (defaultTimeFilter.accept(topFile)) {
|
|
||||||
purgeCount += deleteFile(topFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return purgeCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send race condition message out only one time per purge request.
|
|
||||||
*/
|
|
||||||
private void sendPurgeMessage() {
|
|
||||||
if (!sentPurgeMessage) {
|
|
||||||
sentPurgeMessage = true;
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
|
|
||||||
String message = "Archive purge finding missing directory. Purge may be running on more then one EDEX server";
|
|
||||||
statusHandler.handle(Priority.PROBLEM, message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Purge the contents of a directory of expired data leaving a possibly
|
|
||||||
* empty directory.
|
|
||||||
*
|
|
||||||
* @param dir
|
|
||||||
* @param defaultTimeFilter
|
|
||||||
* @param minPurgeTime
|
|
||||||
* @param extPurgeTime
|
|
||||||
* @param helper
|
|
||||||
* @return purgerCount
|
|
||||||
*/
|
|
||||||
private int purgeDir(File dir, IOFileFilter defaultTimeFilter,
|
|
||||||
Calendar minPurgeTime, Calendar extPurgeTime,
|
|
||||||
CategoryFileDateHelper helper, CategoryConfig category) {
|
|
||||||
int purgeCount = 0;
|
|
||||||
|
|
||||||
File[] dirFiles = dir.listFiles();
|
|
||||||
if (dirFiles == null) {
|
|
||||||
sendPurgeMessage();
|
|
||||||
return purgeCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (File file : dirFiles) {
|
|
||||||
if (!file.isHidden()) {
|
|
||||||
DataSetStatus status = helper.getFileDate(file);
|
|
||||||
if (status.isInDataSet()) {
|
|
||||||
Collection<String> labels = category
|
|
||||||
.getSelectedDisplayNames();
|
|
||||||
boolean isSelected = false;
|
|
||||||
for (String label : status.getDisplayLabels()) {
|
|
||||||
if (labels.contains(label)) {
|
|
||||||
isSelected = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Calendar checkTime = (isSelected ? extPurgeTime
|
|
||||||
: minPurgeTime);
|
|
||||||
Calendar fileTime = status.getTime();
|
|
||||||
boolean purge = fileTime.compareTo(checkTime) < 0;
|
|
||||||
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
|
||||||
String message = String
|
|
||||||
.format("%s [%s] category [%s] %s retention [%s] checkTime [%s] = %s.",
|
|
||||||
(file.isDirectory() ? "Directory"
|
|
||||||
: "File"), file
|
|
||||||
.getAbsoluteFile(), category
|
|
||||||
.getName(), (isSelected ? "ext"
|
|
||||||
: "min"), TimeUtil
|
|
||||||
.formatCalendar(checkTime),
|
|
||||||
TimeUtil.formatCalendar(fileTime),
|
|
||||||
(purge ? "purge" : "retain"));
|
|
||||||
statusHandler.debug(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (purge) {
|
|
||||||
if (file.isDirectory()) {
|
|
||||||
purgeCount += purgeDir(file,
|
|
||||||
FileFilterUtils.trueFileFilter());
|
|
||||||
}
|
|
||||||
purgeCount += deleteFile(file);
|
|
||||||
}
|
|
||||||
} else if (file.isDirectory()) {
|
|
||||||
purgeCount += purgeDir(file, defaultTimeFilter,
|
|
||||||
minPurgeTime, extPurgeTime, helper, category);
|
|
||||||
if (file.list().length == 0) {
|
|
||||||
purgeCount += deleteFile(file);
|
|
||||||
}
|
|
||||||
} else if (defaultTimeFilter.accept(file)) {
|
|
||||||
purgeCount += deleteFile(file);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return purgeCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Recursively purge the contents of a directory based on the filter. The
|
|
||||||
* directory in the initial call is not deleted. This may result in an empty
|
|
||||||
* directory which is the desired result for top level directories.
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* @param dir
|
|
||||||
* @param fileDataFilter
|
|
||||||
* @return purgeCount
|
|
||||||
*/
|
|
||||||
private int purgeDir(File dir, IOFileFilter fileDataFilter) {
|
|
||||||
int purgeCount = 0;
|
|
||||||
File[] dirFiles = dir.listFiles();
|
|
||||||
if (dirFiles == null) {
|
|
||||||
sendPurgeMessage();
|
|
||||||
} else {
|
|
||||||
for (File file : dirFiles) {
|
|
||||||
if (!file.isHidden()) {
|
|
||||||
if (file.isDirectory()) {
|
|
||||||
purgeCount += purgeDir(file, fileDataFilter);
|
|
||||||
if (file.list().length == 0) {
|
|
||||||
purgeCount += deleteFile(file);
|
|
||||||
}
|
|
||||||
} else if (fileDataFilter.accept(file)) {
|
|
||||||
purgeCount += deleteFile(file);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return purgeCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delete a file or directory.
|
|
||||||
*
|
|
||||||
* @param file
|
|
||||||
* @return purgeCount
|
|
||||||
*/
|
|
||||||
private int deleteFile(File file) {
|
|
||||||
int purgeCount = 0;
|
|
||||||
boolean isDir = file.isDirectory();
|
|
||||||
if (file.delete()) {
|
|
||||||
++purgeCount;
|
|
||||||
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
|
||||||
statusHandler
|
|
||||||
.debug(String.format("Purged %s: \"%s\"",
|
|
||||||
(isDir ? "directory" : "file"),
|
|
||||||
file.getAbsolutePath()));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
statusHandler.warn(String.format("Failed to purge %s: \"%s\"",
|
|
||||||
(isDir ? "directory" : "file"), file.getAbsolutePath()));
|
|
||||||
}
|
|
||||||
return purgeCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get expiration time for the category.
|
|
||||||
*
|
|
||||||
* @param archive
|
|
||||||
* @param category
|
|
||||||
* @return expireCal
|
|
||||||
*/
|
|
||||||
private Calendar calculateExpiration(ArchiveConfig archive,
|
|
||||||
CategoryConfig category) {
|
|
||||||
Calendar expireCal = TimeUtil.newGmtCalendar();
|
|
||||||
int retHours = (category == null)
|
|
||||||
|| (category.getRetentionHours() == 0) ? archive
|
|
||||||
.getRetentionHours() : category.getRetentionHours();
|
|
||||||
if (retHours != 0) {
|
|
||||||
expireCal.add(Calendar.HOUR, (-1) * retHours);
|
|
||||||
}
|
|
||||||
return expireCal;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the archive with the given name; return null if it does not
|
* @return the archive with the given name; return null if it does not
|
||||||
* exist.
|
* exist.
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<classpath>
|
<classpath>
|
||||||
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
|
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
|
||||||
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
|
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
|
||||||
<classpathentry kind="src" path="src"/>
|
<classpathentry kind="src" path="src"/>
|
||||||
<classpathentry combineaccessrules="false" kind="src" path="/com.raytheon.uf.common.plugin.nwsauth"/>
|
<classpathentry combineaccessrules="false" kind="src" path="/com.raytheon.uf.common.plugin.nwsauth"/>
|
||||||
<classpathentry kind="output" path="bin"/>
|
<classpathentry kind="output" path="bin"/>
|
||||||
</classpath>
|
</classpath>
|
||||||
|
|
|
@ -20,4 +20,5 @@ Require-Bundle: com.raytheon.uf.common.auth;bundle-version="1.12.1174",
|
||||||
com.raytheon.uf.common.time,
|
com.raytheon.uf.common.time,
|
||||||
com.raytheon.uf.common.util;bundle-version="1.12.1174",
|
com.raytheon.uf.common.util;bundle-version="1.12.1174",
|
||||||
com.raytheon.uf.edex.auth;bundle-version="1.12.1174",
|
com.raytheon.uf.edex.auth;bundle-version="1.12.1174",
|
||||||
com.raytheon.uf.edex.core
|
com.raytheon.uf.edex.core,
|
||||||
|
org.apache.commons.io;bundle-version="2.4.0"
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
**/
|
**/
|
||||||
package com.raytheon.uf.edex.archive;
|
package com.raytheon.uf.edex.archive;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
|
@ -26,12 +27,15 @@ import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
|
|
||||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||||
import com.raytheon.uf.common.dataplugin.PluginException;
|
import com.raytheon.uf.common.dataplugin.PluginException;
|
||||||
import com.raytheon.uf.common.dataplugin.PluginProperties;
|
import com.raytheon.uf.common.dataplugin.PluginProperties;
|
||||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||||
import com.raytheon.uf.common.status.UFStatus;
|
import com.raytheon.uf.common.status.UFStatus;
|
||||||
|
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||||
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
|
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
|
||||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||||
|
@ -39,6 +43,8 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||||
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||||
import com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler;
|
import com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler.LockType;
|
||||||
import com.raytheon.uf.edex.database.plugin.PluginDao;
|
import com.raytheon.uf.edex.database.plugin.PluginDao;
|
||||||
import com.raytheon.uf.edex.database.plugin.PluginFactory;
|
import com.raytheon.uf.edex.database.plugin.PluginFactory;
|
||||||
|
|
||||||
|
@ -59,6 +65,7 @@ import com.raytheon.uf.edex.database.plugin.PluginFactory;
|
||||||
* Nov 11, 2013 2478 rjpeter Updated data store copy to always copy hdf5.
|
* Nov 11, 2013 2478 rjpeter Updated data store copy to always copy hdf5.
|
||||||
* Dec 13, 2013 2555 rjpeter Refactored logic into DatabaseArchiveProcessor.
|
* Dec 13, 2013 2555 rjpeter Refactored logic into DatabaseArchiveProcessor.
|
||||||
* Feb 12, 2014 2784 rjpeter Fixed clusterLock to not update the time by default.
|
* Feb 12, 2014 2784 rjpeter Fixed clusterLock to not update the time by default.
|
||||||
|
* Apr 01, 2014 2862 rferrel Add exclusive lock at plug-in level.
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author rjpeter
|
* @author rjpeter
|
||||||
|
@ -108,6 +115,23 @@ public class DatabaseArchiver implements IPluginArchiver {
|
||||||
|
|
||||||
private final boolean compressDatabaseFiles;
|
private final boolean compressDatabaseFiles;
|
||||||
|
|
||||||
|
/** Task to update the lock time for the locked plugin cluster task. */
|
||||||
|
private static final class LockUpdateTask extends TimerTask {
|
||||||
|
/** The locked cluster task's details. */
|
||||||
|
private final String details;
|
||||||
|
|
||||||
|
public LockUpdateTask(String details) {
|
||||||
|
this.details = details;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
|
ClusterLockUtils.updateLockTime(SharedLockHandler.name, details,
|
||||||
|
currentTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The constructor.
|
* The constructor.
|
||||||
*/
|
*/
|
||||||
|
@ -138,7 +162,61 @@ public class DatabaseArchiver implements IPluginArchiver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean archivePluginData(String pluginName, String archivePath) {
|
/**
|
||||||
|
* Attempt to get exclusive consumer's writer lock.
|
||||||
|
*
|
||||||
|
* @param details
|
||||||
|
* @return clusterTask when getting lock successful otherwise null
|
||||||
|
*/
|
||||||
|
private ClusterTask getWriteLock(String details) {
|
||||||
|
SharedLockHandler lockHandler = new SharedLockHandler(LockType.WRITER);
|
||||||
|
ClusterTask ct = ClusterLockUtils.lock(SharedLockHandler.name, details,
|
||||||
|
lockHandler, false);
|
||||||
|
if (LockState.SUCCESSFUL.equals(ct.getLockState())) {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
statusHandler.handle(Priority.INFO, String.format(
|
||||||
|
"Locked: \"%s\"", ct.getId().getDetails()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
statusHandler.handle(Priority.INFO, String.format(
|
||||||
|
"Skip database Archive unable to lock: \"%s\"", ct
|
||||||
|
.getId().getDetails()));
|
||||||
|
}
|
||||||
|
ct = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ct;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unlock the consumer's lock.
|
||||||
|
*
|
||||||
|
* @param ct
|
||||||
|
*/
|
||||||
|
private void releaseWriteLock(ClusterTask ct) {
|
||||||
|
if (ClusterLockUtils.unlock(ct, false)) {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
statusHandler.handle(Priority.INFO, String.format(
|
||||||
|
"Unlocked: \"%s\"", ct.getId().getDetails()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
|
||||||
|
statusHandler.handle(Priority.PROBLEM, String.format(
|
||||||
|
"Unable to unlock: \"%s\"", ct.getId().getDetails()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void archivePluginData(String pluginName, String archivePath) {
|
||||||
|
File archiveDir = new File(archivePath);
|
||||||
|
File pluginDir = new File(archiveDir, pluginName);
|
||||||
|
ClusterTask ctPlugin = getWriteLock(pluginDir.getAbsolutePath());
|
||||||
|
|
||||||
|
if (ctPlugin == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SimpleDateFormat dateFormat = TL_DATE_FORMAT.get();
|
SimpleDateFormat dateFormat = TL_DATE_FORMAT.get();
|
||||||
// set archive time
|
// set archive time
|
||||||
Calendar runTime = TimeUtil.newGmtCalendar();
|
Calendar runTime = TimeUtil.newGmtCalendar();
|
||||||
|
@ -150,7 +228,8 @@ public class DatabaseArchiver implements IPluginArchiver {
|
||||||
ClusterTask ct = ClusterLockUtils.lock(TASK_NAME, pluginName,
|
ClusterTask ct = ClusterLockUtils.lock(TASK_NAME, pluginName,
|
||||||
lockHandler, false);
|
lockHandler, false);
|
||||||
if (!LockState.SUCCESSFUL.equals(ct.getLockState())) {
|
if (!LockState.SUCCESSFUL.equals(ct.getLockState())) {
|
||||||
return true;
|
releaseWriteLock(ctPlugin);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// keep extra info the same until processing updates the time.
|
// keep extra info the same until processing updates the time.
|
||||||
|
@ -161,6 +240,11 @@ public class DatabaseArchiver implements IPluginArchiver {
|
||||||
int recordCount = 0;
|
int recordCount = 0;
|
||||||
statusHandler.info(pluginName + ": Archiving plugin");
|
statusHandler.info(pluginName + ": Archiving plugin");
|
||||||
|
|
||||||
|
Timer lockUpdateTimer = new Timer("Update Shared Lock Time", true);
|
||||||
|
TimerTask task = new LockUpdateTask(ctPlugin.getId().getDetails());
|
||||||
|
lockUpdateTimer.schedule(task, TimeUtil.MILLIS_PER_MINUTE,
|
||||||
|
TimeUtil.MILLIS_PER_MINUTE);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// lookup dao
|
// lookup dao
|
||||||
PluginDao dao = null;
|
PluginDao dao = null;
|
||||||
|
@ -171,7 +255,7 @@ public class DatabaseArchiver implements IPluginArchiver {
|
||||||
.error(pluginName
|
.error(pluginName
|
||||||
+ ": Error getting data access object! Unable to archive data!",
|
+ ": Error getting data access object! Unable to archive data!",
|
||||||
e);
|
e);
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
startTime = determineStartTime(pluginName, ct.getExtraInfo(),
|
startTime = determineStartTime(pluginName, ct.getExtraInfo(),
|
||||||
|
@ -236,9 +320,21 @@ public class DatabaseArchiver implements IPluginArchiver {
|
||||||
// release lock setting archive time in cluster lock
|
// release lock setting archive time in cluster lock
|
||||||
ClusterLockUtils.unlock(ct, false);
|
ClusterLockUtils.unlock(ct, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Stop updating ctPlugin's last execution time before releasing the
|
||||||
|
* cluster's lock.
|
||||||
|
*/
|
||||||
|
if (lockUpdateTimer != null) {
|
||||||
|
lockUpdateTimer.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctPlugin != null) {
|
||||||
|
releaseWriteLock(ctPlugin);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,503 @@
|
||||||
|
/**
|
||||||
|
* This software was developed and / or modified by Raytheon Company,
|
||||||
|
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||||
|
*
|
||||||
|
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||||
|
* This software product contains export-restricted data whose
|
||||||
|
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||||
|
* to non-U.S. persons whether in the United States or abroad requires
|
||||||
|
* an export license or other authorization.
|
||||||
|
*
|
||||||
|
* Contractor Name: Raytheon Company
|
||||||
|
* Contractor Address: 6825 Pine Street, Suite 340
|
||||||
|
* Mail Stop B8
|
||||||
|
* Omaha, NE 68106
|
||||||
|
* 402.291.0100
|
||||||
|
*
|
||||||
|
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||||
|
* further licensing information.
|
||||||
|
**/
|
||||||
|
package com.raytheon.uf.edex.archive.purge;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Calendar;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.io.filefilter.FileFilterUtils;
|
||||||
|
import org.apache.commons.io.filefilter.IOFileFilter;
|
||||||
|
|
||||||
|
import com.raytheon.uf.common.archive.config.ArchiveConfig;
|
||||||
|
import com.raytheon.uf.common.archive.config.ArchiveConfigManager;
|
||||||
|
import com.raytheon.uf.common.archive.config.CategoryConfig;
|
||||||
|
import com.raytheon.uf.common.archive.config.CategoryFileDateHelper;
|
||||||
|
import com.raytheon.uf.common.archive.config.DataSetStatus;
|
||||||
|
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||||
|
import com.raytheon.uf.common.status.UFStatus;
|
||||||
|
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||||
|
import com.raytheon.uf.common.time.util.ITimer;
|
||||||
|
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.ClusterTaskPK;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler.LockType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class coordinates cluster locking of plug-in directories while
|
||||||
|
* performing the archive purge.
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
*
|
||||||
|
* SOFTWARE HISTORY
|
||||||
|
*
|
||||||
|
* Date Ticket# Engineer Description
|
||||||
|
* ------------ ---------- ----------- --------------------------
|
||||||
|
* Apr 01, 2014 2862 rferrel Initial creation
|
||||||
|
*
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* @author rferrel
|
||||||
|
* @version 1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class ArchivePurgeManager {
|
||||||
|
private final IUFStatusHandler statusHandler = UFStatus
|
||||||
|
.getHandler(ArchivePurgeManager.class);
|
||||||
|
|
||||||
|
/** Single instance of the manager. */
|
||||||
|
private final static ArchivePurgeManager instance = new ArchivePurgeManager();
|
||||||
|
|
||||||
|
/** Manger to handle archive configuration information. */
|
||||||
|
private final ArchiveConfigManager manager;
|
||||||
|
|
||||||
|
/** Limit number of times message is sent. */
|
||||||
|
private boolean sentPurgeMessage = false;
|
||||||
|
|
||||||
|
/** Prevent flooding of lock updates on cluster lock. */
|
||||||
|
private final ITimer lockUpdateTimer = TimeUtil.getTimer();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private constructor for singleton.
|
||||||
|
*/
|
||||||
|
private ArchivePurgeManager() {
|
||||||
|
manager = ArchiveConfigManager.getInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return instance
|
||||||
|
*/
|
||||||
|
public static ArchivePurgeManager getInstance() {
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Purge the Files that fall outside of the time frame constraints for the
|
||||||
|
* archive. This will always leave the archive's top level directories even
|
||||||
|
* when they are empty.
|
||||||
|
*
|
||||||
|
* @param archive
|
||||||
|
* @return purgeCount
|
||||||
|
*/
|
||||||
|
public int purgeExpiredFromArchive(ArchiveConfig archive) {
|
||||||
|
String archiveRootDirPath = archive.getRootDir();
|
||||||
|
File archiveRootDir = new File(archiveRootDirPath);
|
||||||
|
|
||||||
|
int purgeCount = 0;
|
||||||
|
sentPurgeMessage = false;
|
||||||
|
|
||||||
|
if (!archiveRootDir.isDirectory()) {
|
||||||
|
statusHandler.error(archiveRootDir.getAbsolutePath()
|
||||||
|
+ " not a directory.");
|
||||||
|
return purgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
statusHandler.info("Purging directory: \""
|
||||||
|
+ archiveRootDir.getAbsolutePath() + "\".");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
||||||
|
String message = String.format(
|
||||||
|
"Start setup of category date helpers for archive: %s.",
|
||||||
|
archive.getName());
|
||||||
|
statusHandler.debug(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<CategoryConfig, CategoryFileDateHelper> helperMap = new HashMap<CategoryConfig, CategoryFileDateHelper>();
|
||||||
|
for (CategoryConfig category : archive.getCategoryList()) {
|
||||||
|
CategoryFileDateHelper helper = new CategoryFileDateHelper(
|
||||||
|
archiveRootDirPath, category);
|
||||||
|
helperMap.put(category, helper);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
||||||
|
String message = String.format(
|
||||||
|
"End setup of category date helpers for archive: %s.",
|
||||||
|
archive.getName());
|
||||||
|
statusHandler.debug(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Calendar minPurgeTime = calculateExpiration(archive, null);
|
||||||
|
|
||||||
|
IOFileFilter defaultTimeFilter = new IOFileFilter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean accept(File dir, String name) {
|
||||||
|
File file = new File(dir, name);
|
||||||
|
return accept(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean accept(File file) {
|
||||||
|
Calendar time = TimeUtil.newGmtCalendar();
|
||||||
|
time.setTimeInMillis(file.lastModified());
|
||||||
|
return time.compareTo(minPurgeTime) < 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
List<File> topLevelFiles = new LinkedList<File>(
|
||||||
|
Arrays.asList(archiveRootDir.listFiles()));
|
||||||
|
|
||||||
|
int previousSize = -1;
|
||||||
|
|
||||||
|
// Keep looping as long as we keep getting locks or list is empty.
|
||||||
|
while ((topLevelFiles.size() > 0)
|
||||||
|
&& (previousSize != topLevelFiles.size())) {
|
||||||
|
previousSize = topLevelFiles.size();
|
||||||
|
Iterator<File> topLevelIter = topLevelFiles.iterator();
|
||||||
|
|
||||||
|
while (topLevelIter.hasNext()) {
|
||||||
|
File topFile = topLevelIter.next();
|
||||||
|
/*
|
||||||
|
* In top level directory ignore all hidden files and
|
||||||
|
* directories.
|
||||||
|
*/
|
||||||
|
ClusterTask ct = null;
|
||||||
|
if (topFile.isHidden()) {
|
||||||
|
topLevelIter.remove();
|
||||||
|
} else {
|
||||||
|
if (topFile.isDirectory()) {
|
||||||
|
ct = getWriteLock(topFile.getAbsolutePath());
|
||||||
|
if (ct == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
topLevelIter.remove();
|
||||||
|
boolean isInCategory = false;
|
||||||
|
for (CategoryConfig category : archive
|
||||||
|
.getCategoryList()) {
|
||||||
|
CategoryFileDateHelper helper = helperMap
|
||||||
|
.get(category);
|
||||||
|
|
||||||
|
if (helper.isCategoryDirectory(topFile.getName())) {
|
||||||
|
isInCategory = true;
|
||||||
|
if (statusHandler
|
||||||
|
.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
String message = String
|
||||||
|
.format("Start purge of category %s - %s, directory \"%s\".",
|
||||||
|
archive.getName(),
|
||||||
|
category.getName(),
|
||||||
|
topFile.getAbsolutePath());
|
||||||
|
statusHandler.info(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Calendar extPurgeTime = calculateExpiration(
|
||||||
|
archive, category);
|
||||||
|
int pc = purgeDir(topFile, defaultTimeFilter,
|
||||||
|
minPurgeTime, extPurgeTime, helper,
|
||||||
|
category, ct);
|
||||||
|
purgeCount += pc;
|
||||||
|
if (statusHandler
|
||||||
|
.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
String message = String
|
||||||
|
.format("End purge of category %s - %s, directory \"%s\", deleted %d files and directories.",
|
||||||
|
archive.getName(),
|
||||||
|
category.getName(),
|
||||||
|
topFile.getAbsolutePath(),
|
||||||
|
pc);
|
||||||
|
statusHandler.info(message);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (isInCategory == false) {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
String message = String.format(
|
||||||
|
"Start purge of directory: \"%s\".",
|
||||||
|
topFile.getAbsolutePath());
|
||||||
|
statusHandler.info(message);
|
||||||
|
}
|
||||||
|
int pc = purgeDir(topFile, defaultTimeFilter);
|
||||||
|
purgeCount += pc;
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
String message = String
|
||||||
|
.format("End purge of directory: \"%s\", deleted %d files and directories.",
|
||||||
|
topFile.getAbsolutePath(), pc);
|
||||||
|
statusHandler.info(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
releaseWriteLock(ct);
|
||||||
|
ct = null;
|
||||||
|
} else {
|
||||||
|
if (defaultTimeFilter.accept(topFile)) {
|
||||||
|
purgeCount += deleteFile(topFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return purgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to get exclusive write lock.
|
||||||
|
*
|
||||||
|
* @param details
|
||||||
|
* @return clusterTask when getting lock successful otherwise null
|
||||||
|
*/
|
||||||
|
private ClusterTask getWriteLock(String details) {
|
||||||
|
SharedLockHandler lockHandler = new SharedLockHandler(LockType.WRITER);
|
||||||
|
ClusterTask ct = ClusterLockUtils.lock(SharedLockHandler.name, details,
|
||||||
|
lockHandler, false);
|
||||||
|
if (ct.getLockState().equals(LockState.SUCCESSFUL)) {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
statusHandler.handle(Priority.INFO, String.format(
|
||||||
|
"Locked: \"%s\"", ct.getId().getDetails()));
|
||||||
|
}
|
||||||
|
lockUpdateTimer.reset();
|
||||||
|
lockUpdateTimer.start();
|
||||||
|
} else {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
statusHandler.handle(Priority.INFO, String.format(
|
||||||
|
"Skip purge unable to lock: \"%s\"", ct.getId()
|
||||||
|
.getDetails()));
|
||||||
|
}
|
||||||
|
ct = null;
|
||||||
|
}
|
||||||
|
return ct;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unlock the consumer's lock.
|
||||||
|
*/
|
||||||
|
private void releaseWriteLock(ClusterTask ct) {
|
||||||
|
if (ClusterLockUtils.unlock(ct, false)) {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
|
statusHandler.handle(Priority.INFO, String.format(
|
||||||
|
"Unlocked: \"%s\"", ct.getId().getDetails()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
|
||||||
|
statusHandler.handle(Priority.PROBLEM, String.format(
|
||||||
|
"Unable to unlock: \"%s\"", ct.getId().getDetails()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lockUpdateTimer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateLockTime(ClusterTask ct) {
|
||||||
|
ClusterTaskPK id = ct.getId();
|
||||||
|
|
||||||
|
// Slow down the rate at which we hit the database.
|
||||||
|
if (lockUpdateTimer.getElapsedTime() >= TimeUtil.MILLIS_PER_MINUTE) {
|
||||||
|
lockUpdateTimer.stop();
|
||||||
|
lockUpdateTimer.reset();
|
||||||
|
lockUpdateTimer.start();
|
||||||
|
ClusterLockUtils.updateLockTime(id.getName(), id.getDetails(),
|
||||||
|
System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Purge the contents of a directory of expired data leaving a possibly
|
||||||
|
* empty directory.
|
||||||
|
*
|
||||||
|
* @param dir
|
||||||
|
* @param defaultTimeFilter
|
||||||
|
* @param minPurgeTime
|
||||||
|
* @param extPurgeTime
|
||||||
|
* @param helper
|
||||||
|
* @param category
|
||||||
|
* @param ct
|
||||||
|
* @return purgeCount
|
||||||
|
*/
|
||||||
|
private int purgeDir(File dir, IOFileFilter defaultTimeFilter,
|
||||||
|
Calendar minPurgeTime, Calendar extPurgeTime,
|
||||||
|
CategoryFileDateHelper helper, CategoryConfig category,
|
||||||
|
ClusterTask ct) {
|
||||||
|
int purgeCount = 0;
|
||||||
|
|
||||||
|
File[] dirFiles = dir.listFiles();
|
||||||
|
if (dirFiles == null) {
|
||||||
|
sendPurgeMessage();
|
||||||
|
return purgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (File file : dirFiles) {
|
||||||
|
updateLockTime(ct);
|
||||||
|
|
||||||
|
if (!file.isHidden()) {
|
||||||
|
DataSetStatus status = helper.getFileDate(file);
|
||||||
|
if (status.isInDataSet()) {
|
||||||
|
Collection<String> labels = category
|
||||||
|
.getSelectedDisplayNames();
|
||||||
|
boolean isSelected = false;
|
||||||
|
for (String label : status.getDisplayLabels()) {
|
||||||
|
if (labels.contains(label)) {
|
||||||
|
isSelected = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Calendar checkTime = (isSelected ? extPurgeTime
|
||||||
|
: minPurgeTime);
|
||||||
|
Calendar fileTime = status.getTime();
|
||||||
|
boolean purge = fileTime.compareTo(checkTime) < 0;
|
||||||
|
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
||||||
|
String message = String
|
||||||
|
.format("%s [%s] category [%s] %s retention [%s] checkTime [%s] = %s.",
|
||||||
|
(file.isDirectory() ? "Directory"
|
||||||
|
: "File"), file
|
||||||
|
.getAbsoluteFile(), category
|
||||||
|
.getName(), (isSelected ? "ext"
|
||||||
|
: "min"), TimeUtil
|
||||||
|
.formatCalendar(checkTime),
|
||||||
|
TimeUtil.formatCalendar(fileTime),
|
||||||
|
(purge ? "purge" : "retain"));
|
||||||
|
statusHandler.debug(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (purge) {
|
||||||
|
if (file.isDirectory()) {
|
||||||
|
purgeCount += purgeDir(file,
|
||||||
|
FileFilterUtils.trueFileFilter());
|
||||||
|
}
|
||||||
|
purgeCount += deleteFile(file);
|
||||||
|
}
|
||||||
|
} else if (file.isDirectory()) {
|
||||||
|
purgeCount += purgeDir(file, defaultTimeFilter,
|
||||||
|
minPurgeTime, extPurgeTime, helper, category, ct);
|
||||||
|
if (file.list().length == 0) {
|
||||||
|
purgeCount += deleteFile(file);
|
||||||
|
}
|
||||||
|
} else if (defaultTimeFilter.accept(file)) {
|
||||||
|
purgeCount += deleteFile(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return purgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively purge the contents of a directory based on the filter. The
|
||||||
|
* directory in the initial call is not deleted. This may result in an empty
|
||||||
|
* directory which is the desired result for top level directories.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @param dir
|
||||||
|
* @param fileDataFilter
|
||||||
|
* @return purgeCount
|
||||||
|
*/
|
||||||
|
private int purgeDir(File dir, IOFileFilter fileDataFilter) {
|
||||||
|
int purgeCount = 0;
|
||||||
|
File[] dirFiles = dir.listFiles();
|
||||||
|
if (dirFiles == null) {
|
||||||
|
sendPurgeMessage();
|
||||||
|
} else {
|
||||||
|
for (File file : dirFiles) {
|
||||||
|
if (!file.isHidden()) {
|
||||||
|
if (file.isDirectory()) {
|
||||||
|
purgeCount += purgeDir(file, fileDataFilter);
|
||||||
|
if (file.list().length == 0) {
|
||||||
|
purgeCount += deleteFile(file);
|
||||||
|
}
|
||||||
|
} else if (fileDataFilter.accept(file)) {
|
||||||
|
purgeCount += deleteFile(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return purgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete a file or directory.
|
||||||
|
*
|
||||||
|
* @param file
|
||||||
|
* @return purgeCount
|
||||||
|
*/
|
||||||
|
private int deleteFile(File file) {
|
||||||
|
int purgeCount = 0;
|
||||||
|
boolean isDir = file.isDirectory();
|
||||||
|
if (file.delete()) {
|
||||||
|
++purgeCount;
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
|
||||||
|
statusHandler
|
||||||
|
.debug(String.format("Purged %s: \"%s\"",
|
||||||
|
(isDir ? "directory" : "file"),
|
||||||
|
file.getAbsolutePath()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
statusHandler.warn(String.format("Failed to purge %s: \"%s\"",
|
||||||
|
(isDir ? "directory" : "file"), file.getAbsolutePath()));
|
||||||
|
}
|
||||||
|
return purgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get expiration time for the category.
|
||||||
|
*
|
||||||
|
* @param archive
|
||||||
|
* @param category
|
||||||
|
* @return expireCal
|
||||||
|
*/
|
||||||
|
private Calendar calculateExpiration(ArchiveConfig archive,
|
||||||
|
CategoryConfig category) {
|
||||||
|
Calendar expireCal = TimeUtil.newGmtCalendar();
|
||||||
|
int retHours = (category == null)
|
||||||
|
|| (category.getRetentionHours() == 0) ? archive
|
||||||
|
.getRetentionHours() : category.getRetentionHours();
|
||||||
|
if (retHours != 0) {
|
||||||
|
expireCal.add(Calendar.HOUR, (-1) * retHours);
|
||||||
|
}
|
||||||
|
return expireCal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send race condition message out only one time per purge request.
|
||||||
|
*/
|
||||||
|
private void sendPurgeMessage() {
|
||||||
|
if (!sentPurgeMessage) {
|
||||||
|
sentPurgeMessage = true;
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
|
||||||
|
String message = "Archive purge finding missing directory. Purge may be running on more then one EDEX server";
|
||||||
|
statusHandler.handle(Priority.PROBLEM, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get archives from the archive configuration manager.
|
||||||
|
*
|
||||||
|
* @return archives
|
||||||
|
*/
|
||||||
|
public Collection<ArchiveConfig> getArchives() {
|
||||||
|
return manager.getArchives();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform the archive configuration manager's reset.
|
||||||
|
*/
|
||||||
|
public void reset() {
|
||||||
|
manager.reset();
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,7 +22,6 @@ package com.raytheon.uf.edex.archive.purge;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import com.raytheon.uf.common.archive.config.ArchiveConfig;
|
import com.raytheon.uf.common.archive.config.ArchiveConfig;
|
||||||
import com.raytheon.uf.common.archive.config.ArchiveConfigManager;
|
|
||||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||||
import com.raytheon.uf.common.status.UFStatus;
|
import com.raytheon.uf.common.status.UFStatus;
|
||||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||||
|
@ -45,6 +44,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
|
||||||
* Sep 03, 2013 2224 rferrel Add check to enable/disable purger.
|
* Sep 03, 2013 2224 rferrel Add check to enable/disable purger.
|
||||||
* Nov 05, 2013 2499 rjpeter Repackaged
|
* Nov 05, 2013 2499 rjpeter Repackaged
|
||||||
* Dec 17, 2013 2603 rjpeter Reload configuration every run of purge.
|
* Dec 17, 2013 2603 rjpeter Reload configuration every run of purge.
|
||||||
|
* Apr 01, 2014 2862 rferrel Refactored to us ArchivePurgeManager.
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author bgonzale
|
* @author bgonzale
|
||||||
|
@ -67,7 +67,7 @@ public class ArchivePurger {
|
||||||
ITimer timer = TimeUtil.getTimer();
|
ITimer timer = TimeUtil.getTimer();
|
||||||
timer.start();
|
timer.start();
|
||||||
statusHandler.info("Archive Purge started.");
|
statusHandler.info("Archive Purge started.");
|
||||||
ArchiveConfigManager manager = ArchiveConfigManager.getInstance();
|
ArchivePurgeManager manager = ArchivePurgeManager.getInstance();
|
||||||
manager.reset();
|
manager.reset();
|
||||||
Collection<ArchiveConfig> archives = manager.getArchives();
|
Collection<ArchiveConfig> archives = manager.getArchives();
|
||||||
for (ArchiveConfig archive : archives) {
|
for (ArchiveConfig archive : archives) {
|
||||||
|
|
|
@ -473,8 +473,7 @@ public class ClusterLockUtils {
|
||||||
* reset to the epoch time. This can be useful when wanting the next check
|
* reset to the epoch time. This can be useful when wanting the next check
|
||||||
* to always succeed.
|
* to always succeed.
|
||||||
*
|
*
|
||||||
* @param taskName
|
* @param ct
|
||||||
* @param details
|
|
||||||
* @param clearTime
|
* @param clearTime
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
@ -533,7 +532,6 @@ public class ClusterLockUtils {
|
||||||
*
|
*
|
||||||
* @param taskName
|
* @param taskName
|
||||||
* @param details
|
* @param details
|
||||||
* @param clearTime
|
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static boolean unlock(String taskName, String details) {
|
public static boolean unlock(String taskName, String details) {
|
||||||
|
|
|
@ -32,7 +32,7 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||||
import com.raytheon.uf.edex.database.cluster.handler.IClusterLockHandler;
|
import com.raytheon.uf.edex.database.cluster.handler.IClusterLockHandler;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO Add Description
|
* The awips.cluser_task table.
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
*
|
*
|
||||||
|
@ -40,6 +40,7 @@ import com.raytheon.uf.edex.database.cluster.handler.IClusterLockHandler;
|
||||||
* Date Ticket# Engineer Description
|
* Date Ticket# Engineer Description
|
||||||
* ------------ ---------- ----------- --------------------------
|
* ------------ ---------- ----------- --------------------------
|
||||||
* Feb 19, 2010 njensen Initial creation
|
* Feb 19, 2010 njensen Initial creation
|
||||||
|
* Apr 02, 2014 2862 rferrel Make lockHanler getter/setters public.
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -128,11 +129,11 @@ public class ClusterTask implements ISerializableObject, Serializable {
|
||||||
this.lockState = lockState;
|
this.lockState = lockState;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected IClusterLockHandler getLockHandler() {
|
public IClusterLockHandler getLockHandler() {
|
||||||
return lockHandler;
|
return lockHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setLockHandler(IClusterLockHandler lockHandler) {
|
public void setLockHandler(IClusterLockHandler lockHandler) {
|
||||||
this.lockHandler = lockHandler;
|
this.lockHandler = lockHandler;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,8 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||||
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO Add Description
|
* This class allows a lock in the cluster task table to be overridden when the
|
||||||
|
* last execution time is older then the current time by a specified amount.
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
*
|
*
|
||||||
|
@ -35,6 +36,8 @@ import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||||
* Date Ticket# Engineer Description
|
* Date Ticket# Engineer Description
|
||||||
* ------------ ---------- ----------- --------------------------
|
* ------------ ---------- ----------- --------------------------
|
||||||
* Nov 15, 2010 rjpeter Initial creation
|
* Nov 15, 2010 rjpeter Initial creation
|
||||||
|
* Apr 08, 2014 2862 rferrel Changes to properly allow statusHandler
|
||||||
|
* to pickup sub-class.
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -43,8 +46,7 @@ import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
|
public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
|
||||||
private static final transient IUFStatusHandler handler = UFStatus
|
protected final IUFStatusHandler statusHandler;
|
||||||
.getHandler(CurrentTimeClusterLockHandler.class);
|
|
||||||
|
|
||||||
protected long checkTime;
|
protected long checkTime;
|
||||||
|
|
||||||
|
@ -62,8 +64,7 @@ public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
|
||||||
* @param extraInfo
|
* @param extraInfo
|
||||||
*/
|
*/
|
||||||
public CurrentTimeClusterLockHandler(long timeOutOverride, String extraInfo) {
|
public CurrentTimeClusterLockHandler(long timeOutOverride, String extraInfo) {
|
||||||
this.timeOutOverride = timeOutOverride;
|
this(timeOutOverride, extraInfo, true);
|
||||||
this.extraInfo = extraInfo;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -76,8 +77,7 @@ public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
|
||||||
*/
|
*/
|
||||||
public CurrentTimeClusterLockHandler(long timeOutOverride,
|
public CurrentTimeClusterLockHandler(long timeOutOverride,
|
||||||
boolean saveExtraInfoOnLock) {
|
boolean saveExtraInfoOnLock) {
|
||||||
this.timeOutOverride = timeOutOverride;
|
this(timeOutOverride, null, saveExtraInfoOnLock);
|
||||||
this.saveExtraInfoOnLock = saveExtraInfoOnLock;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -91,6 +91,7 @@ public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
|
||||||
*/
|
*/
|
||||||
public CurrentTimeClusterLockHandler(long timeOutOverride,
|
public CurrentTimeClusterLockHandler(long timeOutOverride,
|
||||||
String extraInfo, boolean saveExtraInfoOnLock) {
|
String extraInfo, boolean saveExtraInfoOnLock) {
|
||||||
|
this.statusHandler = UFStatus.getHandler(this.getClass());
|
||||||
this.timeOutOverride = timeOutOverride;
|
this.timeOutOverride = timeOutOverride;
|
||||||
this.extraInfo = extraInfo;
|
this.extraInfo = extraInfo;
|
||||||
this.saveExtraInfoOnLock = saveExtraInfoOnLock;
|
this.saveExtraInfoOnLock = saveExtraInfoOnLock;
|
||||||
|
@ -105,18 +106,19 @@ public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
|
||||||
ls = LockState.ALREADY_RUNNING;
|
ls = LockState.ALREADY_RUNNING;
|
||||||
// Override
|
// Override
|
||||||
if (checkTime > ct.getLastExecution() + timeOutOverride) {
|
if (checkTime > ct.getLastExecution() + timeOutOverride) {
|
||||||
if (handler.isPriorityEnabled(Priority.INFO)) {
|
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
|
||||||
handler.handle(
|
statusHandler
|
||||||
Priority.INFO,
|
.handle(Priority.INFO,
|
||||||
"Overriding lock for cluster task ["
|
"Overriding lock for cluster task ["
|
||||||
+ ct.getId().getName()
|
+ ct.getId().getName()
|
||||||
+ "/"
|
+ "/"
|
||||||
+ ct.getId().getDetails()
|
+ ct.getId().getDetails()
|
||||||
+ "] time out ["
|
+ "] time out ["
|
||||||
+ timeOutOverride
|
+ timeOutOverride
|
||||||
+ "] exceeded by "
|
+ "] exceeded by "
|
||||||
+ (checkTime - (ct.getLastExecution() + timeOutOverride))
|
+ (checkTime - (ct
|
||||||
+ " ms.");
|
.getLastExecution() + timeOutOverride))
|
||||||
|
+ " ms.");
|
||||||
}
|
}
|
||||||
ls = LockState.SUCCESSFUL;
|
ls = LockState.SUCCESSFUL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,245 @@
|
||||||
|
/**
|
||||||
|
* This software was developed and / or modified by Raytheon Company,
|
||||||
|
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||||
|
*
|
||||||
|
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||||
|
* This software product contains export-restricted data whose
|
||||||
|
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||||
|
* to non-U.S. persons whether in the United States or abroad requires
|
||||||
|
* an export license or other authorization.
|
||||||
|
*
|
||||||
|
* Contractor Name: Raytheon Company
|
||||||
|
* Contractor Address: 6825 Pine Street, Suite 340
|
||||||
|
* Mail Stop B8
|
||||||
|
* Omaha, NE 68106
|
||||||
|
* 402.291.0100
|
||||||
|
*
|
||||||
|
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||||
|
* further licensing information.
|
||||||
|
**/
|
||||||
|
package com.raytheon.uf.edex.database.cluster.handler;
|
||||||
|
|
||||||
|
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||||
|
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||||
|
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to limit a cluster task to either a writer client or reader shared
|
||||||
|
* clients. When unlocking a lock with a shared reader a client must use the
|
||||||
|
* ClusterLockUtils.getLocks() to find the desired cluster tasks with the latest
|
||||||
|
* updates to extrainfo field. When getting the cluster task for an exclusive
|
||||||
|
* writer client the returned cluster task may be used since no other client
|
||||||
|
* will be associated with the cluster.
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
*
|
||||||
|
* SOFTWARE HISTORY
|
||||||
|
*
|
||||||
|
* Date Ticket# Engineer Description
|
||||||
|
* ------------ ---------- ----------- --------------------------
|
||||||
|
* Mar 31, 2014 2862 rferrel Initial creation
|
||||||
|
*
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* @author rferrel
|
||||||
|
* @version 1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
public final class SharedLockHandler extends CurrentTimeClusterLockHandler {
|
||||||
|
|
||||||
|
/** Client Lock type placed in cluster tasks' extrainfo column. */
|
||||||
|
public static enum LockType {
|
||||||
|
/** Shared reader lock. */
|
||||||
|
READER,
|
||||||
|
/** Exclusive writer lock. */
|
||||||
|
WRITER
|
||||||
|
};
|
||||||
|
|
||||||
|
/** The value for the cluster tasks' name column. */
|
||||||
|
public static final String name = "Shared Lock";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Common override time out. Clients need to rely on this value in order to
|
||||||
|
* update last execution time to maintain the lock.
|
||||||
|
*/
|
||||||
|
private static final long OVERRIDE_TIMEOUT = 5 * TimeUtil.MILLIS_PER_MINUTE;
|
||||||
|
|
||||||
|
/** Used to split type and count. */
|
||||||
|
private final String SPLIT_DELIMITER = ":";
|
||||||
|
|
||||||
|
/** The handler's current lock time. */
|
||||||
|
private LockType type;
|
||||||
|
|
||||||
|
/** The lock type handler wants to obtain. */
|
||||||
|
private LockType wantedType;
|
||||||
|
|
||||||
|
/** The number of clients associated with the cluster task's lock. */
|
||||||
|
private int lockCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor using default time out override.
|
||||||
|
*
|
||||||
|
* @param wantedType
|
||||||
|
*/
|
||||||
|
public SharedLockHandler(LockType wantedType) {
|
||||||
|
super(OVERRIDE_TIMEOUT, true);
|
||||||
|
this.wantedType = wantedType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler
|
||||||
|
* #setTimeOutOverride(long)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setTimeOutOverride(long timeOutOverride) {
|
||||||
|
/*
|
||||||
|
* Do not allow the override time out to be changed. If changed may
|
||||||
|
* allowed the lock to be taken from an active client.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the count and lock type from the string.
|
||||||
|
*
|
||||||
|
* @param extraInfo
|
||||||
|
*/
|
||||||
|
public void parseExtraInfoString(String extraInfo) {
|
||||||
|
if ((extraInfo == null) || (extraInfo.length() == 0)) {
|
||||||
|
// Creating new entry,
|
||||||
|
type = null;
|
||||||
|
lockCount = 0;
|
||||||
|
} else {
|
||||||
|
StringBuilder errorMessage = null;
|
||||||
|
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
|
||||||
|
errorMessage = new StringBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
String val[] = extraInfo.split(SPLIT_DELIMITER);
|
||||||
|
if (val.length == 2) {
|
||||||
|
if (LockType.READER.name().equals(val[0])) {
|
||||||
|
type = LockType.READER;
|
||||||
|
} else if (LockType.WRITER.name().equals(val[0])) {
|
||||||
|
type = LockType.WRITER;
|
||||||
|
} else if (errorMessage != null) {
|
||||||
|
errorMessage.append("has invalid lock type");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
lockCount = Integer.parseInt(val[1]);
|
||||||
|
} catch (NumberFormatException ex) {
|
||||||
|
if (errorMessage != null) {
|
||||||
|
if (errorMessage.length() > 0) {
|
||||||
|
errorMessage.append(" and");
|
||||||
|
} else {
|
||||||
|
errorMessage.append("has");
|
||||||
|
}
|
||||||
|
errorMessage.append(" invalid count value");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (errorMessage != null) {
|
||||||
|
errorMessage.append("is a corrupted value");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((errorMessage != null) && (errorMessage.length() > 0)) {
|
||||||
|
statusHandler.handle(Priority.PROBLEM, String.format(
|
||||||
|
" The extrainfo column value: \"%s\" %s.", extraInfo,
|
||||||
|
errorMessage.toString()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return true when wanted lock and cluster lock are the same
|
||||||
|
*/
|
||||||
|
public boolean locksMatch() {
|
||||||
|
return wantedType.equals(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return lockCount
|
||||||
|
*/
|
||||||
|
public int getLockCount() {
|
||||||
|
return lockCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the current lock type and count into an information string.
|
||||||
|
*
|
||||||
|
* @return extraInfo
|
||||||
|
*/
|
||||||
|
private String createExtraInfoString() {
|
||||||
|
return String.format("%s%s%d", type.name(), SPLIT_DELIMITER, lockCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler
|
||||||
|
* #handleLock(com.raytheon.uf.edex.database.cluster.ClusterTask)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public LockState handleLock(ClusterTask ct) {
|
||||||
|
parseExtraInfoString(ct.getExtraInfo());
|
||||||
|
LockState ls = super.handleLock(ct);
|
||||||
|
|
||||||
|
if (ct.isRunning()) {
|
||||||
|
if (LockState.SUCCESSFUL.equals(ls)) {
|
||||||
|
// Assume locked timed out.
|
||||||
|
lockCount = 0;
|
||||||
|
} else if (LockType.READER.equals(wantedType) && locksMatch()) {
|
||||||
|
// Allow shared reader.
|
||||||
|
ls = LockState.SUCCESSFUL;
|
||||||
|
}
|
||||||
|
} else if (LockState.SUCCESSFUL.equals(ls)) {
|
||||||
|
// non-running lock play it safe and reset the count.
|
||||||
|
lockCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LockState.SUCCESSFUL.equals(ls)) {
|
||||||
|
type = wantedType;
|
||||||
|
lockCount++;
|
||||||
|
extraInfo = createExtraInfoString();
|
||||||
|
}
|
||||||
|
|
||||||
|
return ls;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler
|
||||||
|
* #unlock(com.raytheon.uf.edex.database.cluster.ClusterTask, boolean)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void unlock(ClusterTask ct, boolean clearTime) {
|
||||||
|
parseExtraInfoString(ct.getExtraInfo());
|
||||||
|
|
||||||
|
if (LockType.WRITER.equals(type)) {
|
||||||
|
lockCount = 0;
|
||||||
|
} else {
|
||||||
|
/*
|
||||||
|
* For LockType.READER this assumes ClusterLockUtils.getLocks(name)
|
||||||
|
* is used to find the cluster task in order to get latest extra
|
||||||
|
* info; otherwise the count may be wrong.
|
||||||
|
*/
|
||||||
|
--lockCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update to reflect count change for the lock.
|
||||||
|
ct.setExtraInfo(createExtraInfoString());
|
||||||
|
|
||||||
|
if (lockCount == 0) {
|
||||||
|
ct.setRunning(false);
|
||||||
|
if (clearTime) {
|
||||||
|
ct.setLastExecution(0L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue