Issue #2862 The DatabaseArchiver and ArhivePurger now have write cluster locks.

Change-Id: I067b6037d5b044229f2c1e5acdd7ec5f7a372908

Former-commit-id: 09e4f116a482f2a197c01cdec3609e10e1d2285d
This commit is contained in:
Roger Ferrel 2014-04-01 09:09:26 -05:00
parent 8d918404d5
commit 226b1be015
10 changed files with 884 additions and 332 deletions

View file

@ -90,6 +90,7 @@ import com.raytheon.uf.common.util.FileUtil;
* Dec 04, 2013 2603 rferrel Changes to improve archive purging.
* Dec 17, 2013 2603 rjpeter Fix directory purging.
* Mar 27, 2014 2790 rferrel Detect problems when several purges running at the same time.
* Apr 01, 2014 2862 rferrel Moved purge only routines to ArchivePurgeManager.
* </pre>
*
* @author rferrel
@ -113,9 +114,6 @@ public class ArchiveConfigManager {
/** Mapping of archive configuration data keyed to the name. */
private final Map<String, ArchiveConfig> archiveMap = new HashMap<String, ArchiveConfig>();
/** Limit number of times message is sent. */
private boolean sentPurgeMessage = false;
/** Get the singleton. */
public final static ArchiveConfigManager getInstance() {
return instance;
@ -305,298 +303,6 @@ public class ArchiveConfigManager {
return new File(newArchivePathString);
}
/**
* Purge the Files that fall outside of the time frame constraints for the
* archive. This will always leave the archive's top level directories even
* when they are empty.
*
* @param archive
* @return purgeCount
*/
public int purgeExpiredFromArchive(ArchiveConfig archive) {
String archiveRootDirPath = archive.getRootDir();
File archiveRootDir = new File(archiveRootDirPath);
int purgeCount = 0;
sentPurgeMessage = false;
if (!archiveRootDir.isDirectory()) {
statusHandler.error(archiveRootDir.getAbsolutePath()
+ " not a directory.");
return purgeCount;
}
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.info("Purging directory: \""
+ archiveRootDir.getAbsolutePath() + "\".");
}
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
String message = String.format(
"Start setup of category date helpers for archive: %s.",
archive.getName());
statusHandler.debug(message);
}
Map<CategoryConfig, CategoryFileDateHelper> helperMap = new HashMap<CategoryConfig, CategoryFileDateHelper>();
for (CategoryConfig category : archive.getCategoryList()) {
CategoryFileDateHelper helper = new CategoryFileDateHelper(
archiveRootDirPath, category);
helperMap.put(category, helper);
}
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
String message = String.format(
"End setup of category date helpers for archive: %s.",
archive.getName());
statusHandler.debug(message);
}
final Calendar minPurgeTime = calculateExpiration(archive, null);
IOFileFilter defaultTimeFilter = new IOFileFilter() {
@Override
public boolean accept(File dir, String name) {
File file = new File(dir, name);
return accept(file);
}
@Override
public boolean accept(File file) {
Calendar time = TimeUtil.newGmtCalendar();
time.setTimeInMillis(file.lastModified());
return time.compareTo(minPurgeTime) < 0;
}
};
File[] topLevelFiles = archiveRootDir.listFiles();
for (File topFile : topLevelFiles) {
// In top level directory ignore all hidden files and directories.
if (!topFile.isHidden()) {
if (topFile.isDirectory()) {
boolean isInCategory = false;
for (CategoryConfig category : archive.getCategoryList()) {
CategoryFileDateHelper helper = helperMap.get(category);
if (helper.isCategoryDirectory(topFile.getName())) {
isInCategory = true;
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String message = String
.format("Start purge of category %s - %s, directory \"%s\".",
archive.getName(),
category.getName(),
topFile.getAbsolutePath());
statusHandler.info(message);
}
final Calendar extPurgeTime = calculateExpiration(
archive, category);
int pc = purgeDir(topFile, defaultTimeFilter,
minPurgeTime, extPurgeTime, helper,
category);
purgeCount += pc;
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String message = String
.format("End purge of category %s - %s, directory \"%s\", deleted %d files and directories.",
archive.getName(),
category.getName(),
topFile.getAbsolutePath(), pc);
statusHandler.info(message);
}
break;
}
}
if (isInCategory == false) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String message = String.format(
"Start purge of directory: \"%s\".",
topFile.getAbsolutePath());
statusHandler.info(message);
}
int pc = purgeDir(topFile, defaultTimeFilter);
purgeCount += pc;
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String message = String
.format("End purge of directory: \"%s\", deleted %d files and directories.",
topFile.getAbsolutePath(), pc);
statusHandler.info(message);
}
}
} else {
if (defaultTimeFilter.accept(topFile)) {
purgeCount += deleteFile(topFile);
}
}
}
}
return purgeCount;
}
/**
* Send race condition message out only one time per purge request.
*/
private void sendPurgeMessage() {
if (!sentPurgeMessage) {
sentPurgeMessage = true;
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
String message = "Archive purge finding missing directory. Purge may be running on more then one EDEX server";
statusHandler.handle(Priority.PROBLEM, message);
}
}
}
/**
* Purge the contents of a directory of expired data leaving a possibly
* empty directory.
*
* @param dir
* @param defaultTimeFilter
* @param minPurgeTime
* @param extPurgeTime
* @param helper
* @return purgerCount
*/
private int purgeDir(File dir, IOFileFilter defaultTimeFilter,
Calendar minPurgeTime, Calendar extPurgeTime,
CategoryFileDateHelper helper, CategoryConfig category) {
int purgeCount = 0;
File[] dirFiles = dir.listFiles();
if (dirFiles == null) {
sendPurgeMessage();
return purgeCount;
}
for (File file : dirFiles) {
if (!file.isHidden()) {
DataSetStatus status = helper.getFileDate(file);
if (status.isInDataSet()) {
Collection<String> labels = category
.getSelectedDisplayNames();
boolean isSelected = false;
for (String label : status.getDisplayLabels()) {
if (labels.contains(label)) {
isSelected = true;
break;
}
}
Calendar checkTime = (isSelected ? extPurgeTime
: minPurgeTime);
Calendar fileTime = status.getTime();
boolean purge = fileTime.compareTo(checkTime) < 0;
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
String message = String
.format("%s [%s] category [%s] %s retention [%s] checkTime [%s] = %s.",
(file.isDirectory() ? "Directory"
: "File"), file
.getAbsoluteFile(), category
.getName(), (isSelected ? "ext"
: "min"), TimeUtil
.formatCalendar(checkTime),
TimeUtil.formatCalendar(fileTime),
(purge ? "purge" : "retain"));
statusHandler.debug(message);
}
if (purge) {
if (file.isDirectory()) {
purgeCount += purgeDir(file,
FileFilterUtils.trueFileFilter());
}
purgeCount += deleteFile(file);
}
} else if (file.isDirectory()) {
purgeCount += purgeDir(file, defaultTimeFilter,
minPurgeTime, extPurgeTime, helper, category);
if (file.list().length == 0) {
purgeCount += deleteFile(file);
}
} else if (defaultTimeFilter.accept(file)) {
purgeCount += deleteFile(file);
}
}
}
return purgeCount;
}
/**
* Recursively purge the contents of a directory based on the filter. The
* directory in the initial call is not deleted. This may result in an empty
* directory which is the desired result for top level directories.
*
*
* @param dir
* @param fileDataFilter
* @return purgeCount
*/
private int purgeDir(File dir, IOFileFilter fileDataFilter) {
int purgeCount = 0;
File[] dirFiles = dir.listFiles();
if (dirFiles == null) {
sendPurgeMessage();
} else {
for (File file : dirFiles) {
if (!file.isHidden()) {
if (file.isDirectory()) {
purgeCount += purgeDir(file, fileDataFilter);
if (file.list().length == 0) {
purgeCount += deleteFile(file);
}
} else if (fileDataFilter.accept(file)) {
purgeCount += deleteFile(file);
}
}
}
}
return purgeCount;
}
/**
* Delete a file or directory.
*
* @param file
* @return purgeCount
*/
private int deleteFile(File file) {
int purgeCount = 0;
boolean isDir = file.isDirectory();
if (file.delete()) {
++purgeCount;
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler
.debug(String.format("Purged %s: \"%s\"",
(isDir ? "directory" : "file"),
file.getAbsolutePath()));
}
} else {
statusHandler.warn(String.format("Failed to purge %s: \"%s\"",
(isDir ? "directory" : "file"), file.getAbsolutePath()));
}
return purgeCount;
}
/**
* Get expiration time for the category.
*
* @param archive
* @param category
* @return expireCal
*/
private Calendar calculateExpiration(ArchiveConfig archive,
CategoryConfig category) {
Calendar expireCal = TimeUtil.newGmtCalendar();
int retHours = (category == null)
|| (category.getRetentionHours() == 0) ? archive
.getRetentionHours() : category.getRetentionHours();
if (retHours != 0) {
expireCal.add(Calendar.HOUR, (-1) * retHours);
}
return expireCal;
}
/**
* @return the archive with the given name; return null if it does not
* exist.

View file

@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
<classpathentry kind="src" path="src"/>
<classpathentry combineaccessrules="false" kind="src" path="/com.raytheon.uf.common.plugin.nwsauth"/>
<classpathentry kind="output" path="bin"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
<classpathentry kind="src" path="src"/>
<classpathentry combineaccessrules="false" kind="src" path="/com.raytheon.uf.common.plugin.nwsauth"/>
<classpathentry kind="output" path="bin"/>
</classpath>

View file

@ -20,4 +20,5 @@ Require-Bundle: com.raytheon.uf.common.auth;bundle-version="1.12.1174",
com.raytheon.uf.common.time,
com.raytheon.uf.common.util;bundle-version="1.12.1174",
com.raytheon.uf.edex.auth;bundle-version="1.12.1174",
com.raytheon.uf.edex.core
com.raytheon.uf.edex.core,
org.apache.commons.io;bundle-version="2.4.0"

View file

@ -19,6 +19,7 @@
**/
package com.raytheon.uf.edex.archive;
import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
@ -26,12 +27,15 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.Timer;
import java.util.TimerTask;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.PluginException;
import com.raytheon.uf.common.dataplugin.PluginProperties;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
import com.raytheon.uf.edex.database.DataAccessLayerException;
@ -39,6 +43,8 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
import com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler;
import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler;
import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler.LockType;
import com.raytheon.uf.edex.database.plugin.PluginDao;
import com.raytheon.uf.edex.database.plugin.PluginFactory;
@ -59,6 +65,7 @@ import com.raytheon.uf.edex.database.plugin.PluginFactory;
* Nov 11, 2013 2478 rjpeter Updated data store copy to always copy hdf5.
* Dec 13, 2013 2555 rjpeter Refactored logic into DatabaseArchiveProcessor.
* Feb 12, 2014 2784 rjpeter Fixed clusterLock to not update the time by default.
* Apr 01, 2014 2862 rferrel Add exclusive lock at plug-in level.
* </pre>
*
* @author rjpeter
@ -108,6 +115,23 @@ public class DatabaseArchiver implements IPluginArchiver {
private final boolean compressDatabaseFiles;
/** Task to update the lock time for the locked plugin cluster task. */
private static final class LockUpdateTask extends TimerTask {
/** The locked cluster task's details. */
private final String details;
public LockUpdateTask(String details) {
this.details = details;
}
@Override
public void run() {
long currentTime = System.currentTimeMillis();
ClusterLockUtils.updateLockTime(SharedLockHandler.name, details,
currentTime);
}
}
/**
* The constructor.
*/
@ -138,7 +162,61 @@ public class DatabaseArchiver implements IPluginArchiver {
}
}
public boolean archivePluginData(String pluginName, String archivePath) {
/**
* Attempt to get exclusive consumer's writer lock.
*
* @param details
* @return clusterTask when getting lock successful otherwise null
*/
private ClusterTask getWriteLock(String details) {
SharedLockHandler lockHandler = new SharedLockHandler(LockType.WRITER);
ClusterTask ct = ClusterLockUtils.lock(SharedLockHandler.name, details,
lockHandler, false);
if (LockState.SUCCESSFUL.equals(ct.getLockState())) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.handle(Priority.INFO, String.format(
"Locked: \"%s\"", ct.getId().getDetails()));
}
} else {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.handle(Priority.INFO, String.format(
"Skip database Archive unable to lock: \"%s\"", ct
.getId().getDetails()));
}
ct = null;
}
return ct;
}
/**
* Unlock the consumer's lock.
*
* @param ct
*/
private void releaseWriteLock(ClusterTask ct) {
if (ClusterLockUtils.unlock(ct, false)) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.handle(Priority.INFO, String.format(
"Unlocked: \"%s\"", ct.getId().getDetails()));
}
} else {
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
statusHandler.handle(Priority.PROBLEM, String.format(
"Unable to unlock: \"%s\"", ct.getId().getDetails()));
}
}
}
public void archivePluginData(String pluginName, String archivePath) {
File archiveDir = new File(archivePath);
File pluginDir = new File(archiveDir, pluginName);
ClusterTask ctPlugin = getWriteLock(pluginDir.getAbsolutePath());
if (ctPlugin == null) {
return;
}
SimpleDateFormat dateFormat = TL_DATE_FORMAT.get();
// set archive time
Calendar runTime = TimeUtil.newGmtCalendar();
@ -150,7 +228,8 @@ public class DatabaseArchiver implements IPluginArchiver {
ClusterTask ct = ClusterLockUtils.lock(TASK_NAME, pluginName,
lockHandler, false);
if (!LockState.SUCCESSFUL.equals(ct.getLockState())) {
return true;
releaseWriteLock(ctPlugin);
return;
}
// keep extra info the same until processing updates the time.
@ -161,6 +240,11 @@ public class DatabaseArchiver implements IPluginArchiver {
int recordCount = 0;
statusHandler.info(pluginName + ": Archiving plugin");
Timer lockUpdateTimer = new Timer("Update Shared Lock Time", true);
TimerTask task = new LockUpdateTask(ctPlugin.getId().getDetails());
lockUpdateTimer.schedule(task, TimeUtil.MILLIS_PER_MINUTE,
TimeUtil.MILLIS_PER_MINUTE);
try {
// lookup dao
PluginDao dao = null;
@ -171,7 +255,7 @@ public class DatabaseArchiver implements IPluginArchiver {
.error(pluginName
+ ": Error getting data access object! Unable to archive data!",
e);
return false;
return;
}
startTime = determineStartTime(pluginName, ct.getExtraInfo(),
@ -236,9 +320,21 @@ public class DatabaseArchiver implements IPluginArchiver {
// release lock setting archive time in cluster lock
ClusterLockUtils.unlock(ct, false);
}
/*
* Stop updating ctPlugin's last execution time before releasing the
* cluster's lock.
*/
if (lockUpdateTimer != null) {
lockUpdateTimer.cancel();
}
if (ctPlugin != null) {
releaseWriteLock(ctPlugin);
}
}
return true;
return;
}
/**

View file

@ -0,0 +1,503 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.archive.purge;
import java.io.File;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import com.raytheon.uf.common.archive.config.ArchiveConfig;
import com.raytheon.uf.common.archive.config.ArchiveConfigManager;
import com.raytheon.uf.common.archive.config.CategoryConfig;
import com.raytheon.uf.common.archive.config.CategoryFileDateHelper;
import com.raytheon.uf.common.archive.config.DataSetStatus;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
import com.raytheon.uf.edex.database.cluster.ClusterTaskPK;
import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler;
import com.raytheon.uf.edex.database.cluster.handler.SharedLockHandler.LockType;
/**
* This class coordinates cluster locking of plug-in directories while
* performing the archive purge.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 01, 2014 2862 rferrel Initial creation
*
* </pre>
*
* @author rferrel
* @version 1.0
*/
public class ArchivePurgeManager {
private final IUFStatusHandler statusHandler = UFStatus
.getHandler(ArchivePurgeManager.class);
/** Single instance of the manager. */
private final static ArchivePurgeManager instance = new ArchivePurgeManager();
/** Manger to handle archive configuration information. */
private final ArchiveConfigManager manager;
/** Limit number of times message is sent. */
private boolean sentPurgeMessage = false;
/** Prevent flooding of lock updates on cluster lock. */
private final ITimer lockUpdateTimer = TimeUtil.getTimer();
/**
* Private constructor for singleton.
*/
private ArchivePurgeManager() {
manager = ArchiveConfigManager.getInstance();
}
/**
*
* @return instance
*/
public static ArchivePurgeManager getInstance() {
return instance;
}
/**
* Purge the Files that fall outside of the time frame constraints for the
* archive. This will always leave the archive's top level directories even
* when they are empty.
*
* @param archive
* @return purgeCount
*/
public int purgeExpiredFromArchive(ArchiveConfig archive) {
String archiveRootDirPath = archive.getRootDir();
File archiveRootDir = new File(archiveRootDirPath);
int purgeCount = 0;
sentPurgeMessage = false;
if (!archiveRootDir.isDirectory()) {
statusHandler.error(archiveRootDir.getAbsolutePath()
+ " not a directory.");
return purgeCount;
}
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.info("Purging directory: \""
+ archiveRootDir.getAbsolutePath() + "\".");
}
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
String message = String.format(
"Start setup of category date helpers for archive: %s.",
archive.getName());
statusHandler.debug(message);
}
Map<CategoryConfig, CategoryFileDateHelper> helperMap = new HashMap<CategoryConfig, CategoryFileDateHelper>();
for (CategoryConfig category : archive.getCategoryList()) {
CategoryFileDateHelper helper = new CategoryFileDateHelper(
archiveRootDirPath, category);
helperMap.put(category, helper);
}
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
String message = String.format(
"End setup of category date helpers for archive: %s.",
archive.getName());
statusHandler.debug(message);
}
final Calendar minPurgeTime = calculateExpiration(archive, null);
IOFileFilter defaultTimeFilter = new IOFileFilter() {
@Override
public boolean accept(File dir, String name) {
File file = new File(dir, name);
return accept(file);
}
@Override
public boolean accept(File file) {
Calendar time = TimeUtil.newGmtCalendar();
time.setTimeInMillis(file.lastModified());
return time.compareTo(minPurgeTime) < 0;
}
};
List<File> topLevelFiles = new LinkedList<File>(
Arrays.asList(archiveRootDir.listFiles()));
int previousSize = -1;
// Keep looping as long as we keep getting locks or list is empty.
while ((topLevelFiles.size() > 0)
&& (previousSize != topLevelFiles.size())) {
previousSize = topLevelFiles.size();
Iterator<File> topLevelIter = topLevelFiles.iterator();
while (topLevelIter.hasNext()) {
File topFile = topLevelIter.next();
/*
* In top level directory ignore all hidden files and
* directories.
*/
ClusterTask ct = null;
if (topFile.isHidden()) {
topLevelIter.remove();
} else {
if (topFile.isDirectory()) {
ct = getWriteLock(topFile.getAbsolutePath());
if (ct == null) {
continue;
}
topLevelIter.remove();
boolean isInCategory = false;
for (CategoryConfig category : archive
.getCategoryList()) {
CategoryFileDateHelper helper = helperMap
.get(category);
if (helper.isCategoryDirectory(topFile.getName())) {
isInCategory = true;
if (statusHandler
.isPriorityEnabled(Priority.INFO)) {
String message = String
.format("Start purge of category %s - %s, directory \"%s\".",
archive.getName(),
category.getName(),
topFile.getAbsolutePath());
statusHandler.info(message);
}
final Calendar extPurgeTime = calculateExpiration(
archive, category);
int pc = purgeDir(topFile, defaultTimeFilter,
minPurgeTime, extPurgeTime, helper,
category, ct);
purgeCount += pc;
if (statusHandler
.isPriorityEnabled(Priority.INFO)) {
String message = String
.format("End purge of category %s - %s, directory \"%s\", deleted %d files and directories.",
archive.getName(),
category.getName(),
topFile.getAbsolutePath(),
pc);
statusHandler.info(message);
}
break;
}
}
if (isInCategory == false) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String message = String.format(
"Start purge of directory: \"%s\".",
topFile.getAbsolutePath());
statusHandler.info(message);
}
int pc = purgeDir(topFile, defaultTimeFilter);
purgeCount += pc;
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
String message = String
.format("End purge of directory: \"%s\", deleted %d files and directories.",
topFile.getAbsolutePath(), pc);
statusHandler.info(message);
}
}
releaseWriteLock(ct);
ct = null;
} else {
if (defaultTimeFilter.accept(topFile)) {
purgeCount += deleteFile(topFile);
}
}
}
}
}
return purgeCount;
}
/**
* Attempt to get exclusive write lock.
*
* @param details
* @return clusterTask when getting lock successful otherwise null
*/
private ClusterTask getWriteLock(String details) {
SharedLockHandler lockHandler = new SharedLockHandler(LockType.WRITER);
ClusterTask ct = ClusterLockUtils.lock(SharedLockHandler.name, details,
lockHandler, false);
if (ct.getLockState().equals(LockState.SUCCESSFUL)) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.handle(Priority.INFO, String.format(
"Locked: \"%s\"", ct.getId().getDetails()));
}
lockUpdateTimer.reset();
lockUpdateTimer.start();
} else {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.handle(Priority.INFO, String.format(
"Skip purge unable to lock: \"%s\"", ct.getId()
.getDetails()));
}
ct = null;
}
return ct;
}
/**
* Unlock the consumer's lock.
*/
private void releaseWriteLock(ClusterTask ct) {
if (ClusterLockUtils.unlock(ct, false)) {
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.handle(Priority.INFO, String.format(
"Unlocked: \"%s\"", ct.getId().getDetails()));
}
} else {
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
statusHandler.handle(Priority.PROBLEM, String.format(
"Unable to unlock: \"%s\"", ct.getId().getDetails()));
}
}
lockUpdateTimer.stop();
}
private void updateLockTime(ClusterTask ct) {
ClusterTaskPK id = ct.getId();
// Slow down the rate at which we hit the database.
if (lockUpdateTimer.getElapsedTime() >= TimeUtil.MILLIS_PER_MINUTE) {
lockUpdateTimer.stop();
lockUpdateTimer.reset();
lockUpdateTimer.start();
ClusterLockUtils.updateLockTime(id.getName(), id.getDetails(),
System.currentTimeMillis());
}
}
/**
* Purge the contents of a directory of expired data leaving a possibly
* empty directory.
*
* @param dir
* @param defaultTimeFilter
* @param minPurgeTime
* @param extPurgeTime
* @param helper
* @param category
* @param ct
* @return purgeCount
*/
private int purgeDir(File dir, IOFileFilter defaultTimeFilter,
Calendar minPurgeTime, Calendar extPurgeTime,
CategoryFileDateHelper helper, CategoryConfig category,
ClusterTask ct) {
int purgeCount = 0;
File[] dirFiles = dir.listFiles();
if (dirFiles == null) {
sendPurgeMessage();
return purgeCount;
}
for (File file : dirFiles) {
updateLockTime(ct);
if (!file.isHidden()) {
DataSetStatus status = helper.getFileDate(file);
if (status.isInDataSet()) {
Collection<String> labels = category
.getSelectedDisplayNames();
boolean isSelected = false;
for (String label : status.getDisplayLabels()) {
if (labels.contains(label)) {
isSelected = true;
break;
}
}
Calendar checkTime = (isSelected ? extPurgeTime
: minPurgeTime);
Calendar fileTime = status.getTime();
boolean purge = fileTime.compareTo(checkTime) < 0;
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
String message = String
.format("%s [%s] category [%s] %s retention [%s] checkTime [%s] = %s.",
(file.isDirectory() ? "Directory"
: "File"), file
.getAbsoluteFile(), category
.getName(), (isSelected ? "ext"
: "min"), TimeUtil
.formatCalendar(checkTime),
TimeUtil.formatCalendar(fileTime),
(purge ? "purge" : "retain"));
statusHandler.debug(message);
}
if (purge) {
if (file.isDirectory()) {
purgeCount += purgeDir(file,
FileFilterUtils.trueFileFilter());
}
purgeCount += deleteFile(file);
}
} else if (file.isDirectory()) {
purgeCount += purgeDir(file, defaultTimeFilter,
minPurgeTime, extPurgeTime, helper, category, ct);
if (file.list().length == 0) {
purgeCount += deleteFile(file);
}
} else if (defaultTimeFilter.accept(file)) {
purgeCount += deleteFile(file);
}
}
}
return purgeCount;
}
/**
* Recursively purge the contents of a directory based on the filter. The
* directory in the initial call is not deleted. This may result in an empty
* directory which is the desired result for top level directories.
*
*
* @param dir
* @param fileDataFilter
* @return purgeCount
*/
private int purgeDir(File dir, IOFileFilter fileDataFilter) {
int purgeCount = 0;
File[] dirFiles = dir.listFiles();
if (dirFiles == null) {
sendPurgeMessage();
} else {
for (File file : dirFiles) {
if (!file.isHidden()) {
if (file.isDirectory()) {
purgeCount += purgeDir(file, fileDataFilter);
if (file.list().length == 0) {
purgeCount += deleteFile(file);
}
} else if (fileDataFilter.accept(file)) {
purgeCount += deleteFile(file);
}
}
}
}
return purgeCount;
}
/**
* Delete a file or directory.
*
* @param file
* @return purgeCount
*/
private int deleteFile(File file) {
int purgeCount = 0;
boolean isDir = file.isDirectory();
if (file.delete()) {
++purgeCount;
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler
.debug(String.format("Purged %s: \"%s\"",
(isDir ? "directory" : "file"),
file.getAbsolutePath()));
}
} else {
statusHandler.warn(String.format("Failed to purge %s: \"%s\"",
(isDir ? "directory" : "file"), file.getAbsolutePath()));
}
return purgeCount;
}
/**
* Get expiration time for the category.
*
* @param archive
* @param category
* @return expireCal
*/
private Calendar calculateExpiration(ArchiveConfig archive,
CategoryConfig category) {
Calendar expireCal = TimeUtil.newGmtCalendar();
int retHours = (category == null)
|| (category.getRetentionHours() == 0) ? archive
.getRetentionHours() : category.getRetentionHours();
if (retHours != 0) {
expireCal.add(Calendar.HOUR, (-1) * retHours);
}
return expireCal;
}
/**
* Send race condition message out only one time per purge request.
*/
private void sendPurgeMessage() {
if (!sentPurgeMessage) {
sentPurgeMessage = true;
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
String message = "Archive purge finding missing directory. Purge may be running on more then one EDEX server";
statusHandler.handle(Priority.PROBLEM, message);
}
}
}
/**
* Get archives from the archive configuration manager.
*
* @return archives
*/
public Collection<ArchiveConfig> getArchives() {
return manager.getArchives();
}
/**
* Perform the archive configuration manager's reset.
*/
public void reset() {
manager.reset();
}
}

View file

@ -22,7 +22,6 @@ package com.raytheon.uf.edex.archive.purge;
import java.util.Collection;
import com.raytheon.uf.common.archive.config.ArchiveConfig;
import com.raytheon.uf.common.archive.config.ArchiveConfigManager;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
@ -45,6 +44,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
* Sep 03, 2013 2224 rferrel Add check to enable/disable purger.
* Nov 05, 2013 2499 rjpeter Repackaged
* Dec 17, 2013 2603 rjpeter Reload configuration every run of purge.
* Apr 01, 2014 2862 rferrel Refactored to us ArchivePurgeManager.
* </pre>
*
* @author bgonzale
@ -67,7 +67,7 @@ public class ArchivePurger {
ITimer timer = TimeUtil.getTimer();
timer.start();
statusHandler.info("Archive Purge started.");
ArchiveConfigManager manager = ArchiveConfigManager.getInstance();
ArchivePurgeManager manager = ArchivePurgeManager.getInstance();
manager.reset();
Collection<ArchiveConfig> archives = manager.getArchives();
for (ArchiveConfig archive : archives) {

View file

@ -473,8 +473,7 @@ public class ClusterLockUtils {
* reset to the epoch time. This can be useful when wanting the next check
* to always succeed.
*
* @param taskName
* @param details
* @param ct
* @param clearTime
* @return
*/
@ -533,7 +532,6 @@ public class ClusterLockUtils {
*
* @param taskName
* @param details
* @param clearTime
* @return
*/
public static boolean unlock(String taskName, String details) {

View file

@ -32,7 +32,7 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.handler.IClusterLockHandler;
/**
* TODO Add Description
* The awips.cluser_task table.
*
* <pre>
*
@ -40,6 +40,7 @@ import com.raytheon.uf.edex.database.cluster.handler.IClusterLockHandler;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 19, 2010 njensen Initial creation
* Apr 02, 2014 2862 rferrel Make lockHanler getter/setters public.
*
* </pre>
*
@ -128,11 +129,11 @@ public class ClusterTask implements ISerializableObject, Serializable {
this.lockState = lockState;
}
protected IClusterLockHandler getLockHandler() {
public IClusterLockHandler getLockHandler() {
return lockHandler;
}
protected void setLockHandler(IClusterLockHandler lockHandler) {
public void setLockHandler(IClusterLockHandler lockHandler) {
this.lockHandler = lockHandler;
}
}

View file

@ -26,7 +26,8 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
/**
* TODO Add Description
* This class allows a lock in the cluster task table to be overridden when the
* last execution time is older then the current time by a specified amount.
*
* <pre>
*
@ -35,6 +36,8 @@ import com.raytheon.uf.edex.database.cluster.ClusterTask;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 15, 2010 rjpeter Initial creation
* Apr 08, 2014 2862 rferrel Changes to properly allow statusHandler
* to pickup sub-class.
*
* </pre>
*
@ -43,8 +46,7 @@ import com.raytheon.uf.edex.database.cluster.ClusterTask;
*/
public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
private static final transient IUFStatusHandler handler = UFStatus
.getHandler(CurrentTimeClusterLockHandler.class);
protected final IUFStatusHandler statusHandler;
protected long checkTime;
@ -62,8 +64,7 @@ public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
* @param extraInfo
*/
public CurrentTimeClusterLockHandler(long timeOutOverride, String extraInfo) {
this.timeOutOverride = timeOutOverride;
this.extraInfo = extraInfo;
this(timeOutOverride, extraInfo, true);
}
/**
@ -76,8 +77,7 @@ public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
*/
public CurrentTimeClusterLockHandler(long timeOutOverride,
boolean saveExtraInfoOnLock) {
this.timeOutOverride = timeOutOverride;
this.saveExtraInfoOnLock = saveExtraInfoOnLock;
this(timeOutOverride, null, saveExtraInfoOnLock);
}
/**
@ -91,6 +91,7 @@ public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
*/
public CurrentTimeClusterLockHandler(long timeOutOverride,
String extraInfo, boolean saveExtraInfoOnLock) {
this.statusHandler = UFStatus.getHandler(this.getClass());
this.timeOutOverride = timeOutOverride;
this.extraInfo = extraInfo;
this.saveExtraInfoOnLock = saveExtraInfoOnLock;
@ -105,18 +106,19 @@ public class CurrentTimeClusterLockHandler implements IClusterLockHandler {
ls = LockState.ALREADY_RUNNING;
// Override
if (checkTime > ct.getLastExecution() + timeOutOverride) {
if (handler.isPriorityEnabled(Priority.INFO)) {
handler.handle(
Priority.INFO,
"Overriding lock for cluster task ["
+ ct.getId().getName()
+ "/"
+ ct.getId().getDetails()
+ "] time out ["
+ timeOutOverride
+ "] exceeded by "
+ (checkTime - (ct.getLastExecution() + timeOutOverride))
+ " ms.");
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler
.handle(Priority.INFO,
"Overriding lock for cluster task ["
+ ct.getId().getName()
+ "/"
+ ct.getId().getDetails()
+ "] time out ["
+ timeOutOverride
+ "] exceeded by "
+ (checkTime - (ct
.getLastExecution() + timeOutOverride))
+ " ms.");
}
ls = LockState.SUCCESSFUL;
}

View file

@ -0,0 +1,245 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.database.cluster.handler;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
/**
* Class to limit a cluster task to either a writer client or reader shared
* clients. When unlocking a lock with a shared reader a client must use the
* ClusterLockUtils.getLocks() to find the desired cluster tasks with the latest
* updates to extrainfo field. When getting the cluster task for an exclusive
* writer client the returned cluster task may be used since no other client
* will be associated with the cluster.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Mar 31, 2014 2862 rferrel Initial creation
*
* </pre>
*
* @author rferrel
* @version 1.0
*/
public final class SharedLockHandler extends CurrentTimeClusterLockHandler {
/** Client Lock type placed in cluster tasks' extrainfo column. */
public static enum LockType {
/** Shared reader lock. */
READER,
/** Exclusive writer lock. */
WRITER
};
/** The value for the cluster tasks' name column. */
public static final String name = "Shared Lock";
/**
* Common override time out. Clients need to rely on this value in order to
* update last execution time to maintain the lock.
*/
private static final long OVERRIDE_TIMEOUT = 5 * TimeUtil.MILLIS_PER_MINUTE;
/** Used to split type and count. */
private final String SPLIT_DELIMITER = ":";
/** The handler's current lock time. */
private LockType type;
/** The lock type handler wants to obtain. */
private LockType wantedType;
/** The number of clients associated with the cluster task's lock. */
private int lockCount;
/**
* Constructor using default time out override.
*
* @param wantedType
*/
public SharedLockHandler(LockType wantedType) {
super(OVERRIDE_TIMEOUT, true);
this.wantedType = wantedType;
}
/*
* (non-Javadoc)
*
* @see
* com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler
* #setTimeOutOverride(long)
*/
@Override
public void setTimeOutOverride(long timeOutOverride) {
/*
* Do not allow the override time out to be changed. If changed may
* allowed the lock to be taken from an active client.
*/
}
/**
* Set the count and lock type from the string.
*
* @param extraInfo
*/
public void parseExtraInfoString(String extraInfo) {
if ((extraInfo == null) || (extraInfo.length() == 0)) {
// Creating new entry,
type = null;
lockCount = 0;
} else {
StringBuilder errorMessage = null;
if (statusHandler.isPriorityEnabled(Priority.PROBLEM)) {
errorMessage = new StringBuilder();
}
String val[] = extraInfo.split(SPLIT_DELIMITER);
if (val.length == 2) {
if (LockType.READER.name().equals(val[0])) {
type = LockType.READER;
} else if (LockType.WRITER.name().equals(val[0])) {
type = LockType.WRITER;
} else if (errorMessage != null) {
errorMessage.append("has invalid lock type");
}
try {
lockCount = Integer.parseInt(val[1]);
} catch (NumberFormatException ex) {
if (errorMessage != null) {
if (errorMessage.length() > 0) {
errorMessage.append(" and");
} else {
errorMessage.append("has");
}
errorMessage.append(" invalid count value");
}
}
} else if (errorMessage != null) {
errorMessage.append("is a corrupted value");
}
if ((errorMessage != null) && (errorMessage.length() > 0)) {
statusHandler.handle(Priority.PROBLEM, String.format(
" The extrainfo column value: \"%s\" %s.", extraInfo,
errorMessage.toString()));
}
}
}
/**
*
* @return true when wanted lock and cluster lock are the same
*/
public boolean locksMatch() {
return wantedType.equals(type);
}
/**
*
* @return lockCount
*/
public int getLockCount() {
return lockCount;
}
/**
* Convert the current lock type and count into an information string.
*
* @return extraInfo
*/
private String createExtraInfoString() {
return String.format("%s%s%d", type.name(), SPLIT_DELIMITER, lockCount);
}
/*
* (non-Javadoc)
*
* @see
* com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler
* #handleLock(com.raytheon.uf.edex.database.cluster.ClusterTask)
*/
@Override
public LockState handleLock(ClusterTask ct) {
parseExtraInfoString(ct.getExtraInfo());
LockState ls = super.handleLock(ct);
if (ct.isRunning()) {
if (LockState.SUCCESSFUL.equals(ls)) {
// Assume locked timed out.
lockCount = 0;
} else if (LockType.READER.equals(wantedType) && locksMatch()) {
// Allow shared reader.
ls = LockState.SUCCESSFUL;
}
} else if (LockState.SUCCESSFUL.equals(ls)) {
// non-running lock play it safe and reset the count.
lockCount = 0;
}
if (LockState.SUCCESSFUL.equals(ls)) {
type = wantedType;
lockCount++;
extraInfo = createExtraInfoString();
}
return ls;
}
/*
* (non-Javadoc)
*
* @see
* com.raytheon.uf.edex.database.cluster.handler.CurrentTimeClusterLockHandler
* #unlock(com.raytheon.uf.edex.database.cluster.ClusterTask, boolean)
*/
@Override
public void unlock(ClusterTask ct, boolean clearTime) {
parseExtraInfoString(ct.getExtraInfo());
if (LockType.WRITER.equals(type)) {
lockCount = 0;
} else {
/*
* For LockType.READER this assumes ClusterLockUtils.getLocks(name)
* is used to find the cluster task in order to get latest extra
* info; otherwise the count may be wrong.
*/
--lockCount;
}
// Update to reflect count change for the lock.
ct.setExtraInfo(createExtraInfoString());
if (lockCount == 0) {
ct.setRunning(false);
if (clearTime) {
ct.setLastExecution(0L);
}
}
}
}