Issue #1421: Make hdf5 and database purge the same, update H5pyDataStore

Change-Id: I9cbe9c5cb185b4eb999b0c0a2d6ff6b6d754d0e7

Former-commit-id: f9673e4f67 [formerly 548b521f3f] [formerly 3d4fb31161] [formerly 3d4fb31161 [formerly 513dd75a23]] [formerly 08919e73c3 [formerly 3d4fb31161 [formerly 513dd75a23] [formerly 08919e73c3 [formerly fb25e736d298225d217a1775891a65de7e0bd50c]]]]
Former-commit-id: 08919e73c3
Former-commit-id: 8785a3beea938235632e09d91d09a90fe1ec2c12 [formerly a2ffba67b6efe6ed162228db4c9f67b3e99a3233] [formerly e2007e61e1 [formerly f7859723e5]]
Former-commit-id: e2007e61e1
Former-commit-id: bcf2592bf1
This commit is contained in:
Richard Peter 2012-12-12 17:09:26 -06:00
parent 752a339b66
commit 1f52ca2994
7 changed files with 144 additions and 132 deletions

View file

@ -437,7 +437,6 @@ public class GFEDao extends DefaultPluginDao {
} }
}); });
// we gain nothing by removing from hdf5
Map<File, Pair<List<TimeRange>, String[]>> fileMap = GfeUtil Map<File, Pair<List<TimeRange>, String[]>> fileMap = GfeUtil
.getHdf5FilesAndGroups(GridDatabase.gfeBaseDataDir, parmId, .getHdf5FilesAndGroups(GridDatabase.gfeBaseDataDir, parmId,
times); times);
@ -445,20 +444,21 @@ public class GFEDao extends DefaultPluginDao {
.entrySet()) { .entrySet()) {
File hdf5File = entry.getKey(); File hdf5File = entry.getKey();
IDataStore dataStore = DataStoreFactory.getDataStore(hdf5File); IDataStore dataStore = DataStoreFactory.getDataStore(hdf5File);
String[] groupsToDelete = entry.getValue().getSecond();
try { try {
String[] groupsToDelete = entry.getValue().getSecond(); dataStore.delete(groupsToDelete);
for (String grp : groupsToDelete) {
dataStore.delete(grp);
}
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
statusHandler.handle(Priority.DEBUG, statusHandler.handle(Priority.DEBUG,
"Deleted: " + Arrays.toString(groupsToDelete) "Deleted: " + Arrays.toString(groupsToDelete)
+ " from " + hdf5File.getName()); + " from " + hdf5File.getName());
}
} catch (Exception e) { } catch (Exception e) {
statusHandler.handle(Priority.PROBLEM, statusHandler.handle(
"Error deleting hdf5 records", e); Priority.WARN,
"Error deleting hdf5 record(s) from file: "
+ hdf5File.getPath(), e);
} }
} }
} }

View file

@ -1,10 +1,10 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<purgeRuleSet> <purgeRuleSet>
<key>modelInfo.modelName</key> <key>modelInfo.modelName</key>
<!-- Default rule for data not covered by any rules --> <!-- Default rule for data not covered by any rules, 2 versions up to 7 days-->
<defaultRule> <defaultRule>
<versionsToKeep>2</versionsToKeep> <versionsToKeep>2</versionsToKeep>
<period>02-00:00:00</period> <period>07-00:00:00</period>
</defaultRule> </defaultRule>
<!-- Purge rule for the NAM80 (ETA) model --> <!-- Purge rule for the NAM80 (ETA) model -->
<rule> <rule>

View file

@ -102,19 +102,27 @@ public class TimeUtil {
public static final int SECONDS_PER_MINUTE = 60; public static final int SECONDS_PER_MINUTE = 60;
// create instance of simple date format on class load, as instantiating it private static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() {
// is expensive the SimpleDateFormat class is not thread-safe,
// so calling methods use synchronized
private static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd_HH:mm:ss.S");
private static SimpleDateFormat sqlSdf = new SimpleDateFormat( @Override
"yyyy-MM-dd HH:mm:ss.S"); protected SimpleDateFormat initialValue() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss.S");
static { return sdf;
sqlSdf.setTimeZone(TimeZone.getTimeZone("GMT"));
} }
};
private static ThreadLocal<SimpleDateFormat> sqlSdf = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
return sdf;
}
};
static final ITimeStrategy SYSTEM_TIME_STRATEGY = new SystemTimeStrategy(); static final ITimeStrategy SYSTEM_TIME_STRATEGY = new SystemTimeStrategy();
static final ITimer NULL_CLOCK = new NullClock(); static final ITimer NULL_CLOCK = new NullClock();
@ -132,20 +140,9 @@ public class TimeUtil {
* @return The GMT date * @return The GMT date
*/ */
public static Date calendarToGMT(Calendar cal) { public static Date calendarToGMT(Calendar cal) {
Date dt = null; Calendar copy = (Calendar) cal.clone();
synchronized (sdf) { copy.setTimeZone(TimeZone.getTimeZone("GMT"));
sdf.setTimeZone(TimeZone.getTimeZone("GMT")); return copy.getTime();
String str = formatCalendar(cal);
sdf.setTimeZone(TimeZone.getDefault());
try {
dt = sdf.parse(str);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return dt;
} }
/** /**
@ -173,13 +170,9 @@ public class TimeUtil {
* @return The formatted result * @return The formatted result
*/ */
public static String formatCalendar(Calendar cal) { public static String formatCalendar(Calendar cal) {
String format = null; SimpleDateFormat mySdf = sdf.get();
mySdf.setTimeZone(cal.getTimeZone());
synchronized (sdf) { return mySdf.format(cal.getTime());
sdf.setTimeZone(cal.getTimeZone());
format = sdf.format(cal.getTime());
}
return format;
} }
/** /**
@ -211,7 +204,7 @@ public class TimeUtil {
public static long formattedDateToLong(String formattedDate) { public static long formattedDateToLong(String formattedDate) {
long retVal = 0; long retVal = 0;
try { try {
retVal = sdf.parse(formattedDate).getTime(); retVal = sdf.get().parse(formattedDate).getTime();
} catch (ParseException e) { } catch (ParseException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -219,9 +212,7 @@ public class TimeUtil {
} }
public static String formatToSqlTimestamp(Date aDate) { public static String formatToSqlTimestamp(Date aDate) {
synchronized (sqlSdf) { return sqlSdf.get().format(aDate);
return sqlSdf.format(aDate);
}
} }
/** /**
@ -271,9 +262,9 @@ public class TimeUtil {
Calendar laterCal = TimeUtil.newCalendar(timeZone); Calendar laterCal = TimeUtil.newCalendar(timeZone);
laterCal.setTime(laterDate); laterCal.setTime(laterDate);
return laterCal.get(Calendar.DAY_OF_YEAR) > earlierCal return (laterCal.get(Calendar.DAY_OF_YEAR) > earlierCal
.get(Calendar.DAY_OF_YEAR) .get(Calendar.DAY_OF_YEAR))
|| laterCal.get(Calendar.YEAR) > earlierCal.get(Calendar.YEAR); || (laterCal.get(Calendar.YEAR) > earlierCal.get(Calendar.YEAR));
} }
/** /**

View file

@ -663,7 +663,6 @@ public abstract class PluginDao extends CoreDao {
if (rule.isPeriodSpecified() if (rule.isPeriodSpecified()
&& refTime.before(periodCutoffTime)) { && refTime.before(periodCutoffTime)) {
timesPurgedByRule.add(refTime); timesPurgedByRule.add(refTime);
} else { } else {
timesKeptByRule.add(refTime); timesKeptByRule.add(refTime);
} }
@ -715,17 +714,16 @@ public abstract class PluginDao extends CoreDao {
for (int i = 0; i < refTimesForKey.size(); i++) { for (int i = 0; i < refTimesForKey.size(); i++) {
currentRefTime = refTimesForKey.get(i); currentRefTime = refTimesForKey.get(i);
if (i < rule.getVersionsToKeep()) { if (i < rule.getVersionsToKeep()) {
// allow for period to override versions to keep
if (rule.isPeriodSpecified() if (rule.isPeriodSpecified()
&& currentRefTime.before(periodCutoffTime)) { && currentRefTime.before(periodCutoffTime)) {
timesPurgedByRule.add(currentRefTime); timesPurgedByRule.add(currentRefTime);
} else { } else {
timesKeptByRule.add(currentRefTime); timesKeptByRule.add(currentRefTime);
} }
timesKeptByRule.add(currentRefTime);
} else { } else {
timesPurgedByRule.add(currentRefTime); timesPurgedByRule.add(currentRefTime);
} }
} }
/* /*
* This rule only specifies a time cutoff * This rule only specifies a time cutoff
@ -781,10 +779,6 @@ public abstract class PluginDao extends CoreDao {
// then it will be retained // then it will be retained
timesPurged.removeAll(timesKept); timesPurged.removeAll(timesKept);
int itemsDeletedForKey = 0;
List<Date> orderedTimesPurged = new ArrayList<Date>(timesPurged);
Collections.sort(orderedTimesPurged);
// flags to control how hdf5 is purged and what needs to be returned // flags to control how hdf5 is purged and what needs to be returned
// from the database purge to properly purge hdf5. If purging and // from the database purge to properly purge hdf5. If purging and
// trackToUri is false, hdf5PurgeDates is used to determine if the // trackToUri is false, hdf5PurgeDates is used to determine if the
@ -793,9 +787,10 @@ public abstract class PluginDao extends CoreDao {
// TODO: Update to allow files to not be in hourly granularity // TODO: Update to allow files to not be in hourly granularity
boolean purgeHdf5Data = false; boolean purgeHdf5Data = false;
boolean trackToUri = false; boolean trackToUri = false;
Set<Date> hdf5PurgeDates = new HashSet<Date>();
try { try {
Set<Date> roundedTimesKept = new HashSet<Date>();
// Determine if this plugin uses HDF5 to store data // Determine if this plugin uses HDF5 to store data
purgeHdf5Data = (PluginFactory.getInstance() purgeHdf5Data = (PluginFactory.getInstance()
.getPluginRecordClass(pluginName).newInstance() instanceof IPersistable); .getPluginRecordClass(pluginName).newInstance() instanceof IPersistable);
@ -822,22 +817,12 @@ public abstract class PluginDao extends CoreDao {
trackToUri = true; trackToUri = true;
} else { } else {
// need to compare each key to check for optimized // need to compare each key to check for optimized
// purge, // purge, all productKeys must be a pathKey for
// all productKeys must be a pathKey for optimized // optimized purge, both key lists should be small 3 or
// purge, // less, no need to optimize list look ups
// both key lists should be small 3 or less, no need to
// optimize list look ups
trackToUri = false; trackToUri = false;
for (String productKey : productKeys.keySet()) { for (String productKey : productKeys.keySet()) {
boolean keyMatch = false; if (!pathKeys.contains(productKey)) {
for (String pathKey : pathKeys) {
if (pathKey.equals(productKey)) {
keyMatch = true;
break;
}
}
if (!keyMatch) {
trackToUri = true; trackToUri = true;
break; break;
} }
@ -849,17 +834,22 @@ public abstract class PluginDao extends CoreDao {
} }
// we can optimize purge, sort dates by hour to determine files // we can optimize purge, sort dates by hour to determine files
// to drop // to drop, also don't remove from metadata if we are keeping
// the hdf5 around
if (!trackToUri) { if (!trackToUri) {
Set<Date> roundedTimesKept = new HashSet<Date>();
for (Date dateToRound : timesKept) { for (Date dateToRound : timesKept) {
roundedTimesKept.add(roundDateToHour(dateToRound)); roundedTimesKept.add(roundDateToHour(dateToRound));
} }
for (Date dateToRound : timesPurged) {
Date roundedDate = roundDateToHour(dateToRound); Iterator<Date> purgeTimeIterator = timesPurged.iterator();
if (!roundedTimesKept.contains(roundedDate)) {
hdf5PurgeDates.add(dateToRound); while (purgeTimeIterator.hasNext()) {
Date purgeTime = purgeTimeIterator.next();
// keeping this hdf5 file, remove the purge time
if (roundedTimesKept
.contains(roundDateToHour(purgeTime))) {
purgeTimeIterator.remove();
} }
} }
} }
@ -870,23 +860,48 @@ public abstract class PluginDao extends CoreDao {
this.pluginName, e); this.pluginName, e);
} }
int itemsDeletedForKey = 0;
List<Date> orderedTimesPurged = new ArrayList<Date>(timesPurged);
Collections.sort(orderedTimesPurged);
Map<String, List<String>> hdf5FileToUriMap = new HashMap<String, List<String>>(); Map<String, List<String>> hdf5FileToUriMap = new HashMap<String, List<String>>();
Date previousRoundedDate = null;
for (Date deleteDate : orderedTimesPurged) { for (Date deleteDate : orderedTimesPurged) {
boolean purgeHdf5ForRefTime = purgeHdf5Data; Date roundedDate = roundDateToHour(deleteDate);
// if we aren't tracking by uri, check hdf5 date map
if (purgeHdf5ForRefTime && !trackToUri) {
purgeHdf5ForRefTime = hdf5PurgeDates.contains(deleteDate);
}
// Delete the data in the database // Delete the data in the database
int itemsDeletedForTime = purgeDataByRefTime(deleteDate, int itemsDeletedForTime = purgeDataByRefTime(deleteDate,
productKeys, purgeHdf5ForRefTime, trackToUri, productKeys, purgeHdf5Data, trackToUri, hdf5FileToUriMap);
hdf5FileToUriMap);
itemsDeletedForKey += itemsDeletedForTime; itemsDeletedForKey += itemsDeletedForTime;
// check if any hdf5 data up to this point can be deleted
if (purgeHdf5Data
&& (trackToUri || ((previousRoundedDate != null) && roundedDate
.after(previousRoundedDate)))) {
// delete these entries now
for (Map.Entry<String, List<String>> hdf5Entry : hdf5FileToUriMap
.entrySet()) {
try {
IDataStore ds = DataStoreFactory.getDataStore(new File(
hdf5Entry.getKey()));
List<String> uris = hdf5Entry.getValue();
if (uris == null) {
ds.deleteFiles(null);
} else {
ds.delete(uris.toArray(new String[uris.size()]));
}
} catch (Exception e) {
PurgeLogger.logError("Error occurred purging file: "
+ hdf5Entry.getKey(), this.pluginName, e);
}
}
hdf5FileToUriMap.clear();
}
} }
if (purgeHdf5Data) { if (purgeHdf5Data) {
// delete any remaining data
for (Map.Entry<String, List<String>> hdf5Entry : hdf5FileToUriMap for (Map.Entry<String, List<String>> hdf5Entry : hdf5FileToUriMap
.entrySet()) { .entrySet()) {
try { try {
@ -1129,6 +1144,7 @@ public abstract class PluginDao extends CoreDao {
throws DataAccessLayerException { throws DataAccessLayerException {
int results = 0; int results = 0;
DatabaseQuery dataQuery = new DatabaseQuery(this.daoClass); DatabaseQuery dataQuery = new DatabaseQuery(this.daoClass);
if (refTime != null) { if (refTime != null) {
@ -1141,10 +1157,8 @@ public abstract class PluginDao extends CoreDao {
} }
} }
List<Integer> idList = null; List<PluginDataObject> pdos = null;
DatabaseQuery idQuery = null;
dataQuery.addReturnedField("id");
dataQuery.setMaxResults(500); dataQuery.setMaxResults(500);
// fields for hdf5 purge // fields for hdf5 purge
@ -1152,12 +1166,8 @@ public abstract class PluginDao extends CoreDao {
StringBuilder pathBuilder = new StringBuilder(); StringBuilder pathBuilder = new StringBuilder();
do { do {
idList = (List<Integer>) this.queryByCriteria(dataQuery); pdos = (List<PluginDataObject>) this.queryByCriteria(dataQuery);
if (!idList.isEmpty()) { if ((pdos != null) && !pdos.isEmpty()) {
idQuery = new DatabaseQuery(this.daoClass);
idQuery.addQueryParam("id", idList, QueryOperand.IN);
List<PluginDataObject> pdos = (List<PluginDataObject>) this
.queryByCriteria(idQuery);
this.delete(pdos); this.delete(pdos);
if (trackHdf5 && (hdf5FileToUriPurged != null)) { if (trackHdf5 && (hdf5FileToUriPurged != null)) {
@ -1197,7 +1207,7 @@ public abstract class PluginDao extends CoreDao {
results += pdos.size(); results += pdos.size();
} }
} while ((idList != null) && !idList.isEmpty()); } while ((pdos != null) && !pdos.isEmpty());
return results; return results;
} }
@ -1437,8 +1447,9 @@ public abstract class PluginDao extends CoreDao {
// allow zero length file to disable purge for this plugin // allow zero length file to disable purge for this plugin
if (rulesFile.length() > 0) { if (rulesFile.length() > 0) {
try { try {
PurgeRuleSet purgeRules = (PurgeRuleSet) SerializationUtil PurgeRuleSet purgeRules = SerializationUtil
.jaxbUnmarshalFromXmlFile(rulesFile); .jaxbUnmarshalFromXmlFile(PurgeRuleSet.class,
rulesFile);
// ensure there's a default rule // ensure there's a default rule
if (purgeRules.getDefaultRules() == null) { if (purgeRules.getDefaultRules() == null) {
@ -1469,6 +1480,7 @@ public abstract class PluginDao extends CoreDao {
"EDEX"); "EDEX");
return null; return null;
} }
try { try {
PurgeRuleSet purgeRules = SerializationUtil PurgeRuleSet purgeRules = SerializationUtil
.jaxbUnmarshalFromXmlFile(PurgeRuleSet.class, defaultRule); .jaxbUnmarshalFromXmlFile(PurgeRuleSet.class, defaultRule);
@ -1477,6 +1489,7 @@ public abstract class PluginDao extends CoreDao {
PurgeLogger.logError("Error deserializing default purge rule!", PurgeLogger.logError("Error deserializing default purge rule!",
"DEFAULT"); "DEFAULT");
} }
return null; return null;
} }

View file

@ -25,10 +25,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* Tree representation of the purge rules. Each Node can contain a PurgeRule as * Tree representation of the purge rules. Each Node can contain a List of
* well as a collection of other Nodes. Each Node should be a specific purge key * PurgeRule as well as a collection of other Nodes. Each Node should be a
* value based on the PurgeRuleSet keys. A given set of key/value pairs will * specific purge key value based on the PurgeRuleSet keys. A given set of
* return the most significant purge key that matches. * key/value pairs will return the most significant purge key that matches.
* *
* <pre> * <pre>
* *
@ -54,7 +54,7 @@ public class PurgeRuleTree {
for (PurgeRule rule : rules) { for (PurgeRule rule : rules) {
PurgeNode curNode = root; PurgeNode curNode = root;
List<String> values = rule.getKeyValues(); List<String> values = rule.getKeyValues();
if (values != null) { if ((values != null) && !values.isEmpty()) {
// descend purge tree // descend purge tree
for (String val : values) { for (String val : values) {
Map<String, PurgeNode> childNodes = curNode Map<String, PurgeNode> childNodes = curNode

View file

@ -1,9 +1,10 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<purgeRuleSet> <purgeRuleSet>
<key>info.datasetId</key> <key>info.datasetId</key>
<!-- Default rule for data not covered by any rules --> <!-- Default rule for data not covered by any rules, 2 versions up to 7 days-->
<defaultRule> <defaultRule>
<versionsToKeep>2</versionsToKeep> <versionsToKeep>2</versionsToKeep>
<period>07-00:00:00</period>
</defaultRule> </defaultRule>
<!-- Purge rule for the NAM80 (ETA) model --> <!-- Purge rule for the NAM80 (ETA) model -->
<rule> <rule>

View file

@ -322,7 +322,14 @@ class H5pyDataStore(IDataStore.IDataStore):
try: try:
locs = request.getLocations() locs = request.getLocations()
for dataset in locs: for dataset in locs:
ds = None
try :
ds = self.__getNode(f, None, dataset) ds = self.__getNode(f, None, dataset)
except Exception, e:
logger.warn('Unable to find uri [' + str(dataset) + '] in file [' + str(fn) + '] to delete: ' + IDataStore._exc())
if ds:
grp = ds.parent grp = ds.parent
grp.id.unlink(ds.name) grp.id.unlink(ds.name)