Issue #1421: Make hdf5 and database purge the same, update H5pyDataStore

Change-Id: I9cbe9c5cb185b4eb999b0c0a2d6ff6b6d754d0e7

Former-commit-id: 08919e73c3 [formerly 513dd75a23 [formerly fb25e736d298225d217a1775891a65de7e0bd50c]]
Former-commit-id: 513dd75a23
Former-commit-id: 3d4fb31161
This commit is contained in:
Richard Peter 2012-12-12 17:09:26 -06:00
parent 5d61ce529e
commit 548b521f3f
7 changed files with 144 additions and 132 deletions

View file

@ -437,7 +437,6 @@ public class GFEDao extends DefaultPluginDao {
}
});
// we gain nothing by removing from hdf5
Map<File, Pair<List<TimeRange>, String[]>> fileMap = GfeUtil
.getHdf5FilesAndGroups(GridDatabase.gfeBaseDataDir, parmId,
times);
@ -445,20 +444,21 @@ public class GFEDao extends DefaultPluginDao {
.entrySet()) {
File hdf5File = entry.getKey();
IDataStore dataStore = DataStoreFactory.getDataStore(hdf5File);
String[] groupsToDelete = entry.getValue().getSecond();
try {
String[] groupsToDelete = entry.getValue().getSecond();
for (String grp : groupsToDelete) {
dataStore.delete(grp);
dataStore.delete(groupsToDelete);
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.handle(Priority.DEBUG,
"Deleted: " + Arrays.toString(groupsToDelete)
+ " from " + hdf5File.getName());
}
statusHandler.handle(Priority.DEBUG,
"Deleted: " + Arrays.toString(groupsToDelete)
+ " from " + hdf5File.getName());
} catch (Exception e) {
statusHandler.handle(Priority.PROBLEM,
"Error deleting hdf5 records", e);
statusHandler.handle(
Priority.WARN,
"Error deleting hdf5 record(s) from file: "
+ hdf5File.getPath(), e);
}
}
}

View file

@ -1,10 +1,10 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<purgeRuleSet>
<key>modelInfo.modelName</key>
<!-- Default rule for data not covered by any rules -->
<!-- Default rule for data not covered by any rules, 2 versions up to 7 days-->
<defaultRule>
<versionsToKeep>2</versionsToKeep>
<period>02-00:00:00</period>
<period>07-00:00:00</period>
</defaultRule>
<!-- Purge rule for the NAM80 (ETA) model -->
<rule>

View file

@ -31,9 +31,9 @@ import com.raytheon.uf.common.time.SimulatedTime;
/**
* Utilities for time, some extracted from Util.
*
*
* <pre>
*
*
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
@ -41,9 +41,9 @@ import com.raytheon.uf.common.time.SimulatedTime;
* Sep 11, 2012 1154 djohnson Add MILLIS constants and isNewerDay().
* Nov 09, 2012 1322 djohnson Add SECONDS_PER_MINUTE.
* Nov 21, 2012 728 mpduff Added MILLIS_PER_MONTH.
*
*
* </pre>
*
*
* @author njensen
* @version 1.0
*/
@ -55,11 +55,11 @@ public class TimeUtil {
* only want to keep track of times in a conditional sense, such as if a
* logging priority is enabled. This is an example of the Null Object
* pattern.
*
*
* @see http://en.wikipedia.org/wiki/Null_Object_pattern
*
*
* @author djohnson
*
*
*/
private static class NullClock extends AbstractTimer {
@Override
@ -71,9 +71,9 @@ public class TimeUtil {
/**
* Delegates the retrieval of the current time to the system clock.
* Production code will always use this.
*
*
* @author djohnson
*
*
*/
private static class SystemTimeStrategy implements ITimeStrategy {
@Override
@ -102,18 +102,26 @@ public class TimeUtil {
public static final int SECONDS_PER_MINUTE = 60;
// create instance of simple date format on class load, as instantiating it
// is expensive the SimpleDateFormat class is not thread-safe,
// so calling methods use synchronized
private static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd_HH:mm:ss.S");
private static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() {
private static SimpleDateFormat sqlSdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss.S");
@Override
protected SimpleDateFormat initialValue() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss.S");
return sdf;
}
static {
sqlSdf.setTimeZone(TimeZone.getTimeZone("GMT"));
}
};
private static ThreadLocal<SimpleDateFormat> sqlSdf = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
return sdf;
}
};
static final ITimeStrategy SYSTEM_TIME_STRATEGY = new SystemTimeStrategy();
@ -126,26 +134,15 @@ public class TimeUtil {
/**
* Converts a Calendar in the local time zone to a GMT date
*
*
* @param cal
* A Calendar object in the local time zone
* @return The GMT date
*/
public static Date calendarToGMT(Calendar cal) {
Date dt = null;
synchronized (sdf) {
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
String str = formatCalendar(cal);
sdf.setTimeZone(TimeZone.getDefault());
try {
dt = sdf.parse(str);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return dt;
Calendar copy = (Calendar) cal.clone();
copy.setTimeZone(TimeZone.getTimeZone("GMT"));
return copy.getTime();
}
/**
@ -157,7 +154,7 @@ public class TimeUtil {
* configured the system to a specific time. Those purposes are handled by
* the {@link SimulatedTime} class. The {@link Date} and {@link Calendar}
* returning methods in this class will delegate to {@link SimulatedTime}.
*
*
* @see {@link SimulatedTime}
* @return the current time in milliseconds
*/
@ -167,25 +164,21 @@ public class TimeUtil {
/**
* Formats a calendar object into the following format yyyy-MM-dd_HH:mm:ss.S
*
*
* @param cal
* The calendar to format
* @return The formatted result
*/
public static String formatCalendar(Calendar cal) {
String format = null;
synchronized (sdf) {
sdf.setTimeZone(cal.getTimeZone());
format = sdf.format(cal.getTime());
}
return format;
SimpleDateFormat mySdf = sdf.get();
mySdf.setTimeZone(cal.getTimeZone());
return mySdf.format(cal.getTime());
}
/**
* Retrieve date as a string in the index standard format: yyyy-MM-dd
* kk:mm:ss.SSS
*
*
* @param aCalendar
* A Calendar instance
* @return The formatted date string from the Calendar instance
@ -197,7 +190,7 @@ public class TimeUtil {
/**
* Retrieve date as a string in the index standard format: yyyy-MM-dd
* kk:mm:ss.SSS
*
*
* @param aDate
* A Date instance
* @return The formatted date string from the Date instance
@ -211,7 +204,7 @@ public class TimeUtil {
public static long formattedDateToLong(String formattedDate) {
long retVal = 0;
try {
retVal = sdf.parse(formattedDate).getTime();
retVal = sdf.get().parse(formattedDate).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
@ -219,16 +212,14 @@ public class TimeUtil {
}
public static String formatToSqlTimestamp(Date aDate) {
synchronized (sqlSdf) {
return sqlSdf.format(aDate);
}
return sqlSdf.get().format(aDate);
}
/**
* Retrieve a {@link ITimer} instance that will only actually keep track of
* time if the specified priority level is enabled. This allows efficient
* use of system resources, while calling code need not change.
*
*
* @param handler
* the handler to use to check for a priority level being enabled
* @param priority
@ -243,7 +234,7 @@ public class TimeUtil {
/**
* Retrieve a {@link ITimer} that allows the demarcation of arbitrary start
* and stop times.
*
*
* @return a {@link ITimer}
*/
public static ITimer getTimer() {
@ -253,7 +244,7 @@ public class TimeUtil {
/**
* Check whether the time represented by a {@link Date} is a new day
* compared to another {@link Date} object.
*
*
* @param earlierDate
* the earlier date
* @param laterDate
@ -271,16 +262,16 @@ public class TimeUtil {
Calendar laterCal = TimeUtil.newCalendar(timeZone);
laterCal.setTime(laterDate);
return laterCal.get(Calendar.DAY_OF_YEAR) > earlierCal
.get(Calendar.DAY_OF_YEAR)
|| laterCal.get(Calendar.YEAR) > earlierCal.get(Calendar.YEAR);
return (laterCal.get(Calendar.DAY_OF_YEAR) > earlierCal
.get(Calendar.DAY_OF_YEAR))
|| (laterCal.get(Calendar.YEAR) > earlierCal.get(Calendar.YEAR));
}
/**
* Return a new {@link Calendar} instance. This method delegates to the
* {@link SimulatedTime} class to determine the currently configured system
* time.
*
*
* @see {@link SimulatedTime}
* @return the calendar
*/
@ -294,7 +285,7 @@ public class TimeUtil {
* Return a new {@link Calendar} instance for the specified {@link TimeZone}
* . This method delegates to the {@link SimulatedTime} class to determine
* the currently configured system time.
*
*
* @param timeZone
* the time zone
* @see {@link SimulatedTime}
@ -310,7 +301,7 @@ public class TimeUtil {
* Return a new {@link Date} instance. This method delegates to the
* {@link SimulatedTime} class to determine the currently configured system
* time.
*
*
* @see {@link SimulatedTime}
* @return the current {@link Date}
*/
@ -322,7 +313,7 @@ public class TimeUtil {
* Return a new ImmutableDate. This method delegates to the
* {@link SimulatedTime} class to determine the currently configured system
* time.
*
*
* @see {@link SimulatedTime}
* @return an immutable date for the current time
*/

View file

@ -663,7 +663,6 @@ public abstract class PluginDao extends CoreDao {
if (rule.isPeriodSpecified()
&& refTime.before(periodCutoffTime)) {
timesPurgedByRule.add(refTime);
} else {
timesKeptByRule.add(refTime);
}
@ -715,17 +714,16 @@ public abstract class PluginDao extends CoreDao {
for (int i = 0; i < refTimesForKey.size(); i++) {
currentRefTime = refTimesForKey.get(i);
if (i < rule.getVersionsToKeep()) {
// allow for period to override versions to keep
if (rule.isPeriodSpecified()
&& currentRefTime.before(periodCutoffTime)) {
timesPurgedByRule.add(currentRefTime);
} else {
timesKeptByRule.add(currentRefTime);
}
timesKeptByRule.add(currentRefTime);
} else {
timesPurgedByRule.add(currentRefTime);
}
}
/*
* This rule only specifies a time cutoff
@ -781,10 +779,6 @@ public abstract class PluginDao extends CoreDao {
// then it will be retained
timesPurged.removeAll(timesKept);
int itemsDeletedForKey = 0;
List<Date> orderedTimesPurged = new ArrayList<Date>(timesPurged);
Collections.sort(orderedTimesPurged);
// flags to control how hdf5 is purged and what needs to be returned
// from the database purge to properly purge hdf5. If purging and
// trackToUri is false, hdf5PurgeDates is used to determine if the
@ -793,9 +787,10 @@ public abstract class PluginDao extends CoreDao {
// TODO: Update to allow files to not be in hourly granularity
boolean purgeHdf5Data = false;
boolean trackToUri = false;
Set<Date> hdf5PurgeDates = new HashSet<Date>();
try {
Set<Date> roundedTimesKept = new HashSet<Date>();
// Determine if this plugin uses HDF5 to store data
purgeHdf5Data = (PluginFactory.getInstance()
.getPluginRecordClass(pluginName).newInstance() instanceof IPersistable);
@ -822,22 +817,12 @@ public abstract class PluginDao extends CoreDao {
trackToUri = true;
} else {
// need to compare each key to check for optimized
// purge,
// all productKeys must be a pathKey for optimized
// purge,
// both key lists should be small 3 or less, no need to
// optimize list look ups
// purge, all productKeys must be a pathKey for
// optimized purge, both key lists should be small 3 or
// less, no need to optimize list look ups
trackToUri = false;
for (String productKey : productKeys.keySet()) {
boolean keyMatch = false;
for (String pathKey : pathKeys) {
if (pathKey.equals(productKey)) {
keyMatch = true;
break;
}
}
if (!keyMatch) {
if (!pathKeys.contains(productKey)) {
trackToUri = true;
break;
}
@ -849,17 +834,22 @@ public abstract class PluginDao extends CoreDao {
}
// we can optimize purge, sort dates by hour to determine files
// to drop
// to drop, also don't remove from metadata if we are keeping
// the hdf5 around
if (!trackToUri) {
Set<Date> roundedTimesKept = new HashSet<Date>();
for (Date dateToRound : timesKept) {
roundedTimesKept.add(roundDateToHour(dateToRound));
}
for (Date dateToRound : timesPurged) {
Date roundedDate = roundDateToHour(dateToRound);
if (!roundedTimesKept.contains(roundedDate)) {
hdf5PurgeDates.add(dateToRound);
Iterator<Date> purgeTimeIterator = timesPurged.iterator();
while (purgeTimeIterator.hasNext()) {
Date purgeTime = purgeTimeIterator.next();
// keeping this hdf5 file, remove the purge time
if (roundedTimesKept
.contains(roundDateToHour(purgeTime))) {
purgeTimeIterator.remove();
}
}
}
@ -870,23 +860,48 @@ public abstract class PluginDao extends CoreDao {
this.pluginName, e);
}
int itemsDeletedForKey = 0;
List<Date> orderedTimesPurged = new ArrayList<Date>(timesPurged);
Collections.sort(orderedTimesPurged);
Map<String, List<String>> hdf5FileToUriMap = new HashMap<String, List<String>>();
Date previousRoundedDate = null;
for (Date deleteDate : orderedTimesPurged) {
boolean purgeHdf5ForRefTime = purgeHdf5Data;
// if we aren't tracking by uri, check hdf5 date map
if (purgeHdf5ForRefTime && !trackToUri) {
purgeHdf5ForRefTime = hdf5PurgeDates.contains(deleteDate);
}
Date roundedDate = roundDateToHour(deleteDate);
// Delete the data in the database
int itemsDeletedForTime = purgeDataByRefTime(deleteDate,
productKeys, purgeHdf5ForRefTime, trackToUri,
hdf5FileToUriMap);
productKeys, purgeHdf5Data, trackToUri, hdf5FileToUriMap);
itemsDeletedForKey += itemsDeletedForTime;
// check if any hdf5 data up to this point can be deleted
if (purgeHdf5Data
&& (trackToUri || ((previousRoundedDate != null) && roundedDate
.after(previousRoundedDate)))) {
// delete these entries now
for (Map.Entry<String, List<String>> hdf5Entry : hdf5FileToUriMap
.entrySet()) {
try {
IDataStore ds = DataStoreFactory.getDataStore(new File(
hdf5Entry.getKey()));
List<String> uris = hdf5Entry.getValue();
if (uris == null) {
ds.deleteFiles(null);
} else {
ds.delete(uris.toArray(new String[uris.size()]));
}
} catch (Exception e) {
PurgeLogger.logError("Error occurred purging file: "
+ hdf5Entry.getKey(), this.pluginName, e);
}
}
hdf5FileToUriMap.clear();
}
}
if (purgeHdf5Data) {
// delete any remaining data
for (Map.Entry<String, List<String>> hdf5Entry : hdf5FileToUriMap
.entrySet()) {
try {
@ -1129,6 +1144,7 @@ public abstract class PluginDao extends CoreDao {
throws DataAccessLayerException {
int results = 0;
DatabaseQuery dataQuery = new DatabaseQuery(this.daoClass);
if (refTime != null) {
@ -1141,10 +1157,8 @@ public abstract class PluginDao extends CoreDao {
}
}
List<Integer> idList = null;
DatabaseQuery idQuery = null;
List<PluginDataObject> pdos = null;
dataQuery.addReturnedField("id");
dataQuery.setMaxResults(500);
// fields for hdf5 purge
@ -1152,12 +1166,8 @@ public abstract class PluginDao extends CoreDao {
StringBuilder pathBuilder = new StringBuilder();
do {
idList = (List<Integer>) this.queryByCriteria(dataQuery);
if (!idList.isEmpty()) {
idQuery = new DatabaseQuery(this.daoClass);
idQuery.addQueryParam("id", idList, QueryOperand.IN);
List<PluginDataObject> pdos = (List<PluginDataObject>) this
.queryByCriteria(idQuery);
pdos = (List<PluginDataObject>) this.queryByCriteria(dataQuery);
if ((pdos != null) && !pdos.isEmpty()) {
this.delete(pdos);
if (trackHdf5 && (hdf5FileToUriPurged != null)) {
@ -1197,7 +1207,7 @@ public abstract class PluginDao extends CoreDao {
results += pdos.size();
}
} while ((idList != null) && !idList.isEmpty());
} while ((pdos != null) && !pdos.isEmpty());
return results;
}
@ -1437,8 +1447,9 @@ public abstract class PluginDao extends CoreDao {
// allow zero length file to disable purge for this plugin
if (rulesFile.length() > 0) {
try {
PurgeRuleSet purgeRules = (PurgeRuleSet) SerializationUtil
.jaxbUnmarshalFromXmlFile(rulesFile);
PurgeRuleSet purgeRules = SerializationUtil
.jaxbUnmarshalFromXmlFile(PurgeRuleSet.class,
rulesFile);
// ensure there's a default rule
if (purgeRules.getDefaultRules() == null) {
@ -1469,6 +1480,7 @@ public abstract class PluginDao extends CoreDao {
"EDEX");
return null;
}
try {
PurgeRuleSet purgeRules = SerializationUtil
.jaxbUnmarshalFromXmlFile(PurgeRuleSet.class, defaultRule);
@ -1477,6 +1489,7 @@ public abstract class PluginDao extends CoreDao {
PurgeLogger.logError("Error deserializing default purge rule!",
"DEFAULT");
}
return null;
}

View file

@ -25,10 +25,10 @@ import java.util.List;
import java.util.Map;
/**
* Tree representation of the purge rules. Each Node can contain a PurgeRule as
* well as a collection of other Nodes. Each Node should be a specific purge key
* value based on the PurgeRuleSet keys. A given set of key/value pairs will
* return the most significant purge key that matches.
* Tree representation of the purge rules. Each Node can contain a List of
* PurgeRule as well as a collection of other Nodes. Each Node should be a
* specific purge key value based on the PurgeRuleSet keys. A given set of
* key/value pairs will return the most significant purge key that matches.
*
* <pre>
*
@ -54,7 +54,7 @@ public class PurgeRuleTree {
for (PurgeRule rule : rules) {
PurgeNode curNode = root;
List<String> values = rule.getKeyValues();
if (values != null) {
if ((values != null) && !values.isEmpty()) {
// descend purge tree
for (String val : values) {
Map<String, PurgeNode> childNodes = curNode

View file

@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<purgeRuleSet>
<key>info.datasetId</key>
<!-- Default rule for data not covered by any rules -->
<defaultRule>
<versionsToKeep>2</versionsToKeep>
</defaultRule>
<!-- Default rule for data not covered by any rules, 2 versions up to 7 days-->
<defaultRule>
<versionsToKeep>2</versionsToKeep>
<period>07-00:00:00</period>
</defaultRule>
<!-- Purge rule for the NAM80 (ETA) model -->
<rule>
<keyValue>ETA</keyValue>

View file

@ -322,9 +322,16 @@ class H5pyDataStore(IDataStore.IDataStore):
try:
locs = request.getLocations()
for dataset in locs:
ds = self.__getNode(f, None, dataset)
grp = ds.parent
grp.id.unlink(ds.name)
ds = None
try :
ds = self.__getNode(f, None, dataset)
except Exception, e:
logger.warn('Unable to find uri [' + str(dataset) + '] in file [' + str(fn) + '] to delete: ' + IDataStore._exc())
if ds:
grp = ds.parent
grp.id.unlink(ds.name)
finally:
# check if file has any remaining data sets