Issue #20: Committing refactored archiver capability

Change-Id: Ifc07621d2ad7b315a8d8391ae6ae6608516ba3bf

Issue #20: Address peer review comments from previous commit.

Change-Id: Ifc07621d2ad7b315a8d8391ae6ae6608516ba3bf

Former-commit-id: a10e6766d8 [formerly 535821e5b8] [formerly bb1ac9959b] [formerly 061966f243 [formerly bb1ac9959b [formerly 47750dfd5e387238f409ead55549cbb24f5afbe1]]]
Former-commit-id: 061966f243
Former-commit-id: 4931a468f096c3bd0bb7f0820c5d10d1c1c224aa [formerly 9f416558fa]
Former-commit-id: 3e42bc7d2d
This commit is contained in:
David Gillingham 2012-02-09 11:34:32 -06:00
parent d0735ec99a
commit 9a17298fb6
23 changed files with 486 additions and 146 deletions

2
.gitignore vendored
View file

@ -8,3 +8,5 @@ bin-test/
/Installer.rpm
/Installer.rpm
*.pyo

View file

@ -245,8 +245,14 @@ public class RemoteDataStore implements IDataStore {
}
@Override
public void repack(Compression compression, String outputDir,
String timestampCheck) throws StorageException {
public void repack(Compression compression) throws StorageException {
throw new StorageException("Operation not supported", null);
}
@Override
public void copy(String outputDir, Compression compression,
String timestampCheck, int minMillisSinceLastChange,
int maxMillisSinceLastChange) throws StorageException {
throw new StorageException("Operation not supported", null);
}

View file

@ -379,13 +379,26 @@ public class CachingDataStore implements IDataStore {
* (non-Javadoc)
*
* @see
* com.raytheon.uf.common.datastorage.IDataStore#repack(java.lang.String,
* com.raytheon.uf.common.datastorage.StorageProperties.Compression)
* com.raytheon.uf.common.datastorage.IDataStore#repack(com.raytheon.uf.
* common.datastorage.StorageProperties.Compression)
*/
@Override
public void repack(Compression compression, String outputDir,
String timestampCheck) throws StorageException {
delegate.repack(compression, outputDir, timestampCheck);
public void repack(Compression compression) throws StorageException {
delegate.repack(compression);
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.common.datastorage.IDataStore#copy(java.lang.String,
* com.raytheon.uf.common.datastorage.StorageProperties.Compression,
* java.lang.String, int, int)
*/
@Override
public void copy(String outputDir, Compression compression,
String timestampCheck, int minMillisSinceLastChange,
int maxMillisSinceLastChange) throws StorageException {
delegate.copy(outputDir, compression, timestampCheck,
minMillisSinceLastChange, maxMillisSinceLastChange);
}
}

View file

@ -26,4 +26,4 @@ acarssounding.cron=00+10,30,50+*+*+*+?
gfe.cron=0+15+*+*+*+?
repack.cron=0+20+*+*+*+?
# runs database and hdf5 archive for archive server to pull data from
archive.cron=0+0+*+*+*+?
archive.cron=0+40+*+*+*+?

View file

@ -1960,8 +1960,14 @@ public class HDF5DataStore implements IDataStore {
}
@Override
public void repack(Compression compression, String outputDir,
String timestampCheck) throws StorageException {
public void repack(Compression compression) throws StorageException {
throw new StorageException("Operation not supported", null);
}
@Override
public void copy(String outputDir, Compression compression,
String timestampCheck, int minMillisSinceLastChange,
int maxMillisSinceLastChange) throws StorageException {
throw new StorageException("Operation not supported", null);
}
}

View file

@ -257,10 +257,20 @@ public interface IDataStore extends ISerializableObject {
*
* @param compression
* the type of compression to repack with
*/
public void repack(Compression compression) throws StorageException;
/**
* Recursively copies all files of a certain directory. If compression is
* specified the file will be repacked to the specified compression.
* Presumes that the IDataStore instance is tied to a directory, not a
* specific file.
*
* @param outputDir
* the output directory to put the repacked files, or null if the
* same dir. If the same dir is used, it will delete the original
* file.
* the output directory to put the copied files
* @param compression
* If specified will repack the output file with a given
* compression
* @param timestampCheck
* if not null, the attribute to check on the file for a
* timestamp of the last time this particular action was run.
@ -269,8 +279,18 @@ public interface IDataStore extends ISerializableObject {
* future requests for the same file will check this attribute
* and if the file has not been modified since last run, the file
* will be skipped.
* @param minMillisSinceLastChange
* if greater than 0, the last modified time on the file cannot
* be within minMillisSinceLastChange from current time. This is
* used to not repack files that have changed within a recent
* threshold.
* @param maxMillisSinceLastChange
* if greater than 0, the last modified time on the file must be
* within maxMillisSinceLastChange from current time. This is
* used to ignore files that have not changed within a recent
* threshold.
*/
public void repack(Compression compression, String outputDir,
String timestampCheck) throws StorageException;
public void copy(String outputDir, Compression compression,
String timestampCheck, int minMillisSinceLastChange,
int maxMillisSinceLastChange) throws StorageException;
}

View file

@ -15,6 +15,7 @@ import com.raytheon.uf.common.datastorage.StorageProperties.Compression;
import com.raytheon.uf.common.datastorage.StorageStatus;
import com.raytheon.uf.common.datastorage.records.IDataRecord;
import com.raytheon.uf.common.pypies.request.AbstractRequest;
import com.raytheon.uf.common.pypies.request.CopyRequest;
import com.raytheon.uf.common.pypies.request.CreateDatasetRequest;
import com.raytheon.uf.common.pypies.request.DatasetDataRequest;
import com.raytheon.uf.common.pypies.request.DatasetNamesRequest;
@ -25,7 +26,7 @@ import com.raytheon.uf.common.pypies.request.RepackRequest;
import com.raytheon.uf.common.pypies.request.RetrieveRequest;
import com.raytheon.uf.common.pypies.request.StoreRequest;
import com.raytheon.uf.common.pypies.response.ErrorResponse;
import com.raytheon.uf.common.pypies.response.RepackResponse;
import com.raytheon.uf.common.pypies.response.FileActionResponse;
import com.raytheon.uf.common.pypies.response.RetrieveResponse;
import com.raytheon.uf.common.pypies.response.StoreResponse;
import com.raytheon.uf.common.serialization.SerializationException;
@ -374,14 +375,11 @@ public class PyPiesDataStore implements IDataStore {
}
@Override
public void repack(Compression compression, String outputDir,
String timestampCheck) throws StorageException {
public void repack(Compression compression) throws StorageException {
RepackRequest req = new RepackRequest();
req.setFilename(this.filename);
req.setCompression(compression);
req.setOutputDir(outputDir);
req.setTimestampCheck(timestampCheck);
RepackResponse resp = (RepackResponse) sendRequest(req);
FileActionResponse resp = (FileActionResponse) sendRequest(req);
// TODO do we really want to make this an exception?
// reasoning is if the repack fails for some reason, the original file
// is left as is, just isn't as efficiently packed
@ -400,4 +398,35 @@ public class PyPiesDataStore implements IDataStore {
}
}
@Override
public void copy(String outputDir, Compression compression,
String timestampCheck, int minMillisSinceLastChange,
int maxMillisSinceLastChange) throws StorageException {
CopyRequest req = new CopyRequest();
req.setFilename(this.filename);
if (compression != null) {
req.setRepack(true);
req.setRepackCompression(compression);
} else {
req.setRepack(false);
}
req.setOutputDir(outputDir);
req.setTimestampCheck(timestampCheck);
req.setMinMillisSinceLastChange(minMillisSinceLastChange);
FileActionResponse resp = (FileActionResponse) sendRequest(req);
if (resp != null && resp.getFailedFiles() != null
&& resp.getFailedFiles().length > 0) {
StringBuilder sb = new StringBuilder();
sb.append("Error copying the following files: ");
String[] failed = resp.getFailedFiles();
for (int i = 0; i < failed.length; i++) {
sb.append(failed[i]);
if (i < failed.length - 1) {
sb.append(", ");
}
}
throw new StorageException(sb.toString(), null);
}
}
}

View file

@ -0,0 +1,152 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.pypies.request;
import com.raytheon.uf.common.datastorage.StorageProperties.Compression;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
/**
* TODO Add Description
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 17, 2012 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
@DynamicSerialize
public class CopyRequest extends AbstractRequest {
@DynamicSerializeElement
private boolean repack;
@DynamicSerializeElement
private Compression repackCompression;
@DynamicSerializeElement
private String outputDir;
@DynamicSerializeElement
private String timestampCheck;
@DynamicSerializeElement
private int minMillisSinceLastChange;
@DynamicSerializeElement
private int maxMillisSinceLastChange;
/**
* @return the repack
*/
public boolean isRepack() {
return repack;
}
/**
* @param repack
* the repack to set
*/
public void setRepack(boolean repack) {
this.repack = repack;
}
/**
* @return the repackCompression
*/
public Compression getRepackCompression() {
return repackCompression;
}
/**
* @param repackCompression
* the repackCompression to set
*/
public void setRepackCompression(Compression repackCompression) {
this.repackCompression = repackCompression;
}
/**
* @return the outputDir
*/
public String getOutputDir() {
return outputDir;
}
/**
* @param outputDir
* the outputDir to set
*/
public void setOutputDir(String outputDir) {
this.outputDir = outputDir;
}
/**
* @return the timestampCheck
*/
public String getTimestampCheck() {
return timestampCheck;
}
/**
* @param timestampCheck
* the timestampCheck to set
*/
public void setTimestampCheck(String timestampCheck) {
this.timestampCheck = timestampCheck;
}
/**
* @return the minMillisSinceLastChange
*/
public int getMinMillisSinceLastChange() {
return minMillisSinceLastChange;
}
/**
* @param minMillisSinceLastChange
* the minMillisSinceLastChange to set
*/
public void setMinMillisSinceLastChange(int minMillisSinceLastChange) {
this.minMillisSinceLastChange = minMillisSinceLastChange;
}
/**
* @return the maxMillisSinceLastChange
*/
public int getMaxMillisSinceLastChange() {
return maxMillisSinceLastChange;
}
/**
* @param maxMillisSinceLastChange
* the maxMillisSinceLastChange to set
*/
public void setMaxMillisSinceLastChange(int maxMillisSinceLastChange) {
this.maxMillisSinceLastChange = maxMillisSinceLastChange;
}
}

View file

@ -46,12 +46,6 @@ public class RepackRequest extends AbstractRequest {
@DynamicSerializeElement
private Compression compression;
@DynamicSerializeElement
private String outputDir;
@DynamicSerializeElement
private String timestampCheck;
public Compression getCompression() {
return compression;
}
@ -60,20 +54,4 @@ public class RepackRequest extends AbstractRequest {
this.compression = compression;
}
public String getOutputDir() {
return outputDir;
}
public void setOutputDir(String outputDir) {
this.outputDir = outputDir;
}
public String getTimestampCheck() {
return timestampCheck;
}
public void setTimestampCheck(String timestampCheck) {
this.timestampCheck = timestampCheck;
}
}

View file

@ -40,7 +40,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
*/
@DynamicSerialize
public class RepackResponse {
public class FileActionResponse {
@DynamicSerializeElement
private String[] successfulFiles;

View file

@ -78,7 +78,8 @@ public class FileUtil {
StringBuilder fullPath = new StringBuilder();
for (String component : pathComponents) {
if ((fullPath.length() > 0)
&& (fullPath.charAt(fullPath.length() - 1) != File.separatorChar)) {
&& (fullPath.charAt(fullPath.length() - 1) != File.separatorChar)
&& (component.charAt(0) != File.separatorChar)) {
fullPath.append(File.separatorChar);
}
fullPath.append(component);

View file

@ -23,9 +23,11 @@
<constructor-arg ref="databaseArchiver"/>
</bean>
<!-- Need to register with databaseArchiver for archiving associated data store
<bean id="datastoreArchiverRegistered" factory-bean="dataArchiver" factory-method="registerPluginArchiver">
<constructor-arg ref="dataStoreArchiver"/>
</bean>
-->
<camelContext id="clusteredMaintenanceContext" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler" autoStartup="false">

View file

@ -84,7 +84,7 @@ public class DataStoreRepacker {
String dir = hdf5Dir + File.separator + plugin;
IDataStore ds = DataStoreFactory.getDataStore(new File(dir));
try {
ds.repack(compression, null, "lastRepacked");
ds.repack(compression);
} catch (StorageException e) {
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage());
}
@ -109,5 +109,4 @@ public class DataStoreRepacker {
}
}
}
}

View file

@ -51,7 +51,7 @@ import com.raytheon.uf.edex.maintenance.archive.config.DataArchiveConfig;
* @version 1.0
*/
public class DataStoreArchiver implements IPluginArchiver {
public class DataStoreArchiver {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(DataStoreArchiver.class);
@ -67,19 +67,22 @@ public class DataStoreArchiver implements IPluginArchiver {
this.compression = Compression.valueOf(compression);
}
@Override
public void archivePlugin(String pluginName, String archiveDir,
public void archiveFiles(String[] hdf5Files, String archiveDir,
DataArchiveConfig conf) {
String dirToArchive = hdf5Dir + File.separator + pluginName;
IDataStore ds = DataStoreFactory.getDataStore(new File(dirToArchive));
String outputDir = archiveDir + File.separator + pluginName;
statusHandler.info("Archiving " + dirToArchive);
for (String hdf5File : hdf5Files) {
IDataStore ds = DataStoreFactory.getDataStore(new File(hdf5File));
String outputDir = archiveDir; // + dirs of hdf5 file
try {
ds.repack(compression, outputDir, "lastArchived");
} catch (StorageException e) {
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage());
try {
// data must be older than 30 minutes, and no older than hours
// to keep hours need to lookup plugin and see if compression
// matches, or embed in configuration the compression level on
// archive, but would still need to lookup plugin
ds.copy(outputDir, compression, "lastArchived", 1800000,
conf.getHoursToKeep() * 60000 + 1800000);
} catch (StorageException e) {
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage());
}
}
}
}

View file

@ -41,10 +41,15 @@ import com.raytheon.uf.common.dataplugin.persist.DefaultPathProvider;
import com.raytheon.uf.common.dataplugin.persist.IHDFFilePathProvider;
import com.raytheon.uf.common.dataplugin.persist.IPersistable;
import com.raytheon.uf.common.dataplugin.persist.PersistableDataObject;
import com.raytheon.uf.common.datastorage.DataStoreFactory;
import com.raytheon.uf.common.datastorage.IDataStore;
import com.raytheon.uf.common.datastorage.StorageException;
import com.raytheon.uf.common.datastorage.StorageProperties.Compression;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
import com.raytheon.uf.edex.database.DataAccessLayerException;
@ -145,6 +150,8 @@ public class DatabaseArchiver implements IPluginArchiver {
return false;
}
List<String> datastoreFilesToArchive = new ArrayList<String>();
startTime = determineStartTime(pluginName, ct.getExtraInfo(),
runTime, dao, conf);
Calendar endTime = determineEndTime(startTime, runTime);
@ -154,18 +161,69 @@ public class DatabaseArchiver implements IPluginArchiver {
Map<String, List<PersistableDataObject>> pdosToSave = getPdosByFile(
pluginName, dao, pdoMap, startTime, endTime);
if (pdosToSave != null && pdosToSave.size() > 0) {
if (pdosToSave != null && !pdosToSave.isEmpty()) {
savePdoMap(pluginName, archivePath, pdosToSave);
for (Map.Entry<String, List<PersistableDataObject>> entry : pdosToSave
.entrySet()) {
List<PersistableDataObject> pdoList = entry.getValue();
if (pdoList != null && !pdoList.isEmpty()
&& pdoList.get(0) instanceof IPersistable) {
datastoreFilesToArchive.add(entry.getKey());
}
}
}
startTime = endTime;
endTime = determineEndTime(startTime, runTime);
}
if (pdoMap != null && pdoMap.size() > 0) {
if (pdoMap != null && !pdoMap.isEmpty()) {
savePdoMap(pluginName, archivePath, pdoMap);
}
if (!datastoreFilesToArchive.isEmpty()) {
Compression compRequired = Compression.LZF;
PluginProperties props = PluginRegistry.getInstance()
.getRegisteredObject(pluginName);
if (props != null && props.getCompression() != null) {
if (compRequired.equals(Compression.valueOf(props
.getCompression()))) {
// if plugin is already compressed to the correct level,
// no additional compression required
compRequired = null;
}
}
for (String dataStoreFile : datastoreFilesToArchive) {
IDataStore ds = DataStoreFactory.getDataStore(new File(
FileUtil.join(PluginDao.HDF5_DIR, pluginName,
dataStoreFile)));
int pathSep = dataStoreFile.lastIndexOf(File.separatorChar);
String outputDir = (pathSep > 0 ? FileUtil.join(
archivePath, pluginName,
dataStoreFile.substring(0, pathSep)) : FileUtil
.join(archivePath, pluginName, dataStoreFile));
try {
// data must be older than 30 minutes, and no older than
// hours
// to keep hours need to lookup plugin and see if
// compression
// matches, or embed in configuration the compression
// level on
// archive, but would still need to lookup plugin
ds.copy(outputDir, compRequired, "lastArchived",
1800000,
conf.getHoursToKeep() * 60000 + 1800000);
} catch (StorageException e) {
statusHandler.handle(Priority.PROBLEM,
e.getLocalizedMessage());
}
}
}
// set last archive time to startTime
if (startTime != null) {
lockHandler
@ -212,7 +270,7 @@ public class DatabaseArchiver implements IPluginArchiver {
endTime);
Set<String> newFileEntries = new HashSet<String>();
if (pdos != null && pdos.size() > 0) {
if (pdos != null && !pdos.isEmpty()) {
if (pdos.get(0) instanceof IPersistable) {
IHDFFilePathProvider pathProvider = dao.pathProvider;
@ -291,11 +349,8 @@ public class DatabaseArchiver implements IPluginArchiver {
+ File.separator + entry.getKey();
// remove .h5
int index = path.lastIndexOf('.');
if (index > 0 && path.length() - index < 5) {
// ensure its end of string in case extension is
// dropped/changed
path = path.substring(0, index);
if (path.endsWith(".h5")) {
path = path.substring(0, path.length() - 3);
}
path += ".bin.gz";
@ -329,7 +384,7 @@ public class DatabaseArchiver implements IPluginArchiver {
Calendar startTime = null;
// get previous run time
if (extraInfo != null && extraInfo.length() > 0) {
if (extraInfo != null && !extraInfo.isEmpty()) {
try {
Date prevDate = DATE_FORMAT.parse(extraInfo);

View file

@ -89,7 +89,7 @@ public class MpeLightningSrv {
private QueryResultRow[] getMostRecentStrikes() throws EdexException {
QueryResult rs = null;
CoreDao coreDao = new CoreDao(DaoConfig.DEFAULT);
final String lgtSQL = "select datauri, hdffileid from binlightning "
final String lgtSQL = "select datauri from binlightning "
+ "where reftime > (now()- interval \'30 minutes \')";
try {
rs = (QueryResult) coreDao.executeNativeSql(lgtSQL, true);
@ -104,16 +104,13 @@ public class MpeLightningSrv {
* Inserts a single record into ihfs's lightning table.
*
* @param dataURI
* @param hdfFileId
* @throws EdexException
*/
private void ifhsInsertLightRecord(String dataURI, int hdfFileId)
throws EdexException {
private void ifhsInsertLightRecord(String dataURI) throws EdexException {
int results = 0;
try {
// set up a lightning record
BinLightningRecord ltngRec = new BinLightningRecord(dataURI);
ltngRec.setHdfFileId(hdfFileId);
EnvProperties properties = PropertiesFactory.getInstance()
.getEnvProperties();
@ -171,10 +168,11 @@ public class MpeLightningSrv {
String tuple = "(" + x_hgrids[j] + "," + y_hgrids[j]
+ ", TIMESTAMP '" + ts.toString() + "' ," + strikes[j]
+ ")";
if (j != x_hgrids.length - 1)
if (j != x_hgrids.length - 1) {
tuple = tuple + ",";
else
} else {
tuple = tuple + ";";
}
sql.append(tuple);
}
@ -205,12 +203,12 @@ public class MpeLightningSrv {
*/
private void ifhsInsertMostRecentStrikes(QueryResultRow[] rows)
throws EdexException {
if (rows.length == 0)
if (rows.length == 0) {
logger.info("No new lightning records to insert in ifhs. ");
}
for (QueryResultRow row : rows) {
String dataURI = (String) row.getColumn(0);
Integer hdfFileId = (Integer) row.getColumn(1);
ifhsInsertLightRecord(dataURI, hdfFileId.intValue());
ifhsInsertLightRecord(dataURI);
}
}

View file

@ -0,0 +1,74 @@
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
# File auto-generated against equivalent DynamicSerialize Java class
class CopyRequest(object):
def __init__(self):
self.repack = None
self.repackCompression = None
self.outputDir = None
self.timestampCheck = None
self.minMillisSinceLastChange = None
self.maxMillisSinceLastChange = None
self.filename = None
def getRepack(self):
return self.repack
def setRepack(self, repack):
self.repack = repack
def getRepackCompression(self):
return self.repackCompression
def setRepackCompression(self, repackCompression):
self.repackCompression = repackCompression
def getOutputDir(self):
return self.outputDir
def setOutputDir(self, outputDir):
self.outputDir = outputDir
def getTimestampCheck(self):
return self.timestampCheck
def setTimestampCheck(self, timestampCheck):
self.timestampCheck = timestampCheck
def getMinMillisSinceLastChange(self):
return self.minMillisSinceLastChange
def setMinMillisSinceLastChange(self, minMillisSinceLastChange):
self.minMillisSinceLastChange = minMillisSinceLastChange
def getMaxMillisSinceLastChange(self):
return self.maxMillisSinceLastChange
def setMaxMillisSinceLastChange(self, maxMillisSinceLastChange):
self.maxMillisSinceLastChange = maxMillisSinceLastChange
def getFilename(self):
return self.filename
def setFilename(self, filename):
self.filename = filename

View file

@ -24,8 +24,6 @@ class RepackRequest(object):
def __init__(self):
self.compression = None
self.outputDir = None
self.timestampCheck = None
self.filename = None
def getCompression(self):
@ -34,21 +32,8 @@ class RepackRequest(object):
def setCompression(self, compression):
self.compression = compression
def getOutputDir(self):
return self.outputDir
def setOutputDir(self, outputDir):
self.outputDir = outputDir
def getTimestampCheck(self):
return self.timestampCheck
def setTimestampCheck(self, timestampCheck):
self.timestampCheck = timestampCheck
def getFilename(self):
return self.filename
def setFilename(self, filename):
self.filename = filename

View file

@ -21,6 +21,7 @@
# File auto-generated by PythonFileGenerator
__all__ = [
'CopyRequest',
'CreateDatasetRequest',
'DatasetDataRequest',
'DatasetNamesRequest',
@ -32,6 +33,7 @@ __all__ = [
'StoreRequest'
]
from CopyRequest import CopyRequest
from CreateDatasetRequest import CreateDatasetRequest
from DatasetDataRequest import DatasetDataRequest
from DatasetNamesRequest import DatasetNamesRequest
@ -41,4 +43,3 @@ from GroupsRequest import GroupsRequest
from RepackRequest import RepackRequest
from RetrieveRequest import RetrieveRequest
from StoreRequest import StoreRequest

View file

@ -20,7 +20,7 @@
# File auto-generated against equivalent DynamicSerialize Java class
class RepackResponse(object):
class FileActionResponse(object):
def __init__(self):
self.successfulFiles = None
@ -37,4 +37,3 @@ class RepackResponse(object):
def setFailedFiles(self, failedFiles):
self.failedFiles = failedFiles

View file

@ -23,14 +23,13 @@
__all__ = [
'DeleteResponse',
'ErrorResponse',
'RepackResponse',
'FileActionResponse',
'RetrieveResponse',
'StoreResponse'
]
from DeleteResponse import DeleteResponse
from ErrorResponse import ErrorResponse
from RepackResponse import RepackResponse
from FileActionResponse import FileActionResponse
from RetrieveResponse import RetrieveResponse
from StoreResponse import StoreResponse

View file

@ -55,7 +55,8 @@ datastoreMap = {
DeleteRequest: (datastore.delete, "DeleteRequest"),
DeleteFilesRequest: (datastore.deleteFiles, "DeleteFilesRequest"),
CreateDatasetRequest: (datastore.createDataset, "CreateDatasetRequest"),
RepackRequest: (datastore.repack, "RepackRequest")
RepackRequest: (datastore.repack, "RepackRequest"),
CopyRequest: (datastore.copy, "CopyRequest")
}
@Request.application

View file

@ -34,6 +34,7 @@
#
import h5py, os, numpy, pypies, re, logging, shutil, time, types
import fnmatch
import subprocess, stat #for h5repack
from pypies import IDataStore, StorageException, NotImplementedException
from pypies import MkDirLockManager as LockManager
@ -617,33 +618,72 @@ class H5pyDataStore(IDataStore.IDataStore):
#id = group.id
#id.link(dataset.name, linkName, h5py.h5g.LINK_SOFT)
def copy(self, request):
resp = FileActionResponse()
file = request.getFilename()
pth = os.path.split(file)[0]
repack = request.getRepack()
action = self.__doCopy if not repack else self.__doRepack
self.__doFileAction(file, pth, request.getOutputDir(), action, resp, request.getRepackCompression(), request.getTimestampCheck())
return resp
def __doCopy(self, filepath, basePath, outputDir, compression):
shutil.copy(filepath, outputDir)
success = (os.path.isfile(os.path.join(outputDir, os.path.basename(filepath))))
return success
def repack(self, request):
resp = FileActionResponse()
pth = request.getFilename()
files = self.__listHdf5Files(pth)
compression = request.getCompression()
resp = RepackResponse()
if os.path.exists(pth):
self.__recurseRepack(pth, request, resp)
for f in files:
self.__doFileAction(f, pth, None, self.__doRepack, resp, compression)
return resp
def __recurseRepack(self, pth, req, resp):
files = os.listdir(pth)
for f in files:
fullpath = pth + '/' + f
if os.path.isdir(fullpath):
self.__recurseRepack(fullpath, req, resp)
elif len(f) > 3 and f[-3:] == '.h5':
self.__doRepack(fullpath, req, resp)
def __listHdf5Files(self, pth):
results = []
for base, dirs, files in os.walk(pth):
goodfiles = fnmatch.filter(files, '*.h5')
results.extend(os.path.join(base, f) for f in goodfiles)
return results
def __doRepack(self, fullpath, req, response):
def __doRepack(self, filepath, basePath, outDir, compression):
# call h5repack to repack the file
if outDir is None:
repackedFullPath = filepath + '.repacked'
else:
repackedFullPath = filepath.replace(basePath, outDir)
cmd = ['h5repack', '-f', compression, filepath, repackedFullPath]
ret = subprocess.call(cmd)
success = (ret == 0)
if success:
# repack was successful, replace the old file if we did it in the
# same directory, otherwise leave it alone
if outDir is None:
os.remove(filepath)
os.rename(repackedFullPath, filepath)
os.chmod(filepath, stat.S_IWUSR | stat.S_IWGRP | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
else:
# remove failed new file if there was one
if os.path.exists(repackedFullPath):
os.remove(repackedFullPath)
if outDir is not None:
# repack failed, but they wanted the data in a different
# directory, so just copy the original data without the repack
shutil.copy(filepath, repackedFullPath)
return success
def __doFileAction(self, filepath, basePath, outputDir, fileAction, response, compression='NONE', timestampCheck=None):
lock = None
try:
f, lock = self.__openFile(fullpath, 'a')
f, lock = self.__openFile(filepath, 'a')
proceedWithRepack = True
timestampCheck = req.getTimestampCheck()
if timestampCheck:
if timestampCheck in f.attrs.keys():
lastRepacked = f.attrs[timestampCheck]
lastModified = os.stat(fullpath).st_mtime
lastModified = os.stat(filepath).st_mtime
if lastRepacked > lastModified:
proceedWithRepack = False
if proceedWithRepack:
@ -654,30 +694,7 @@ class H5pyDataStore(IDataStore.IDataStore):
f.attrs[timestampCheck] = time.time() + 30
f.close()
# call h5repack to repack the file
outDir = req.getOutputDir()
if outDir is None:
repackedFullPath = fullpath + '.repacked'
else:
repackedFullPath = fullpath.replace(req.getFilename(), outDir)
cmd = ['h5repack', '-f', req.getCompression(), fullpath, repackedFullPath]
ret = subprocess.call(cmd)
success = (ret == 0)
if success:
# repack was successful, replace the old file if we did it in the
# same directory, otherwise leave it alone
if outDir is None:
os.remove(fullpath)
os.rename(repackedFullPath, fullpath)
os.chmod(fullpath, stat.S_IWUSR | stat.S_IWGRP | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
else:
# remove failed new file if there was one
if os.path.exists(repackedFullPath):
os.remove(repackedFullPath)
if outDir is not None:
# repack failed, but they wanted the data in a different
# directory, so just copy the original data without the repack
shutil.copy(fullpath, repackedFullPath)
success = fileAction(filepath, basePath, outputDir, compression)
# update response
if success:
@ -688,20 +705,20 @@ class H5pyDataStore(IDataStore.IDataStore):
setter = response.setFailedFiles
responseList = getter()
if responseList:
responseList += [fullpath]
responseList += [filepath]
else:
responseList = [fullpath]
responseList = [filepath]
setter(responseList)
except Exception, e:
logger.warn("Error repacking file " + fullpath + ": " + str(e))
logger.warn("Error repacking file " + filepath + ": " + str(e))
failed = response.getFailedFiles()
if failed:
failed += [fullpath]
failed += [filepath]
else:
failed = [fullpath]
failed = [filepath]
response.setFailedFiles(failed)
finally:
if lock:
LockManager.releaseLock(lock)
LockManager.releaseLock(lock)