Issue #20: Committing refactored archiver capability
Change-Id: Ifc07621d2ad7b315a8d8391ae6ae6608516ba3bf Issue #20: Address peer review comments from previous commit. Change-Id: Ifc07621d2ad7b315a8d8391ae6ae6608516ba3bf Former-commit-id:535821e5b8
[formerlybb1ac9959b
] [formerly061966f243
[formerly 47750dfd5e387238f409ead55549cbb24f5afbe1]] Former-commit-id:061966f243
Former-commit-id:9f416558fa
This commit is contained in:
parent
724db373e9
commit
3e42bc7d2d
23 changed files with 486 additions and 146 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -8,3 +8,5 @@ bin-test/
|
|||
|
||||
/Installer.rpm
|
||||
/Installer.rpm
|
||||
|
||||
*.pyo
|
||||
|
|
|
@ -245,8 +245,14 @@ public class RemoteDataStore implements IDataStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void repack(Compression compression, String outputDir,
|
||||
String timestampCheck) throws StorageException {
|
||||
public void repack(Compression compression) throws StorageException {
|
||||
throw new StorageException("Operation not supported", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copy(String outputDir, Compression compression,
|
||||
String timestampCheck, int minMillisSinceLastChange,
|
||||
int maxMillisSinceLastChange) throws StorageException {
|
||||
throw new StorageException("Operation not supported", null);
|
||||
}
|
||||
|
||||
|
|
|
@ -379,13 +379,26 @@ public class CachingDataStore implements IDataStore {
|
|||
* (non-Javadoc)
|
||||
*
|
||||
* @see
|
||||
* com.raytheon.uf.common.datastorage.IDataStore#repack(java.lang.String,
|
||||
* com.raytheon.uf.common.datastorage.StorageProperties.Compression)
|
||||
* com.raytheon.uf.common.datastorage.IDataStore#repack(com.raytheon.uf.
|
||||
* common.datastorage.StorageProperties.Compression)
|
||||
*/
|
||||
@Override
|
||||
public void repack(Compression compression, String outputDir,
|
||||
String timestampCheck) throws StorageException {
|
||||
delegate.repack(compression, outputDir, timestampCheck);
|
||||
public void repack(Compression compression) throws StorageException {
|
||||
delegate.repack(compression);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see com.raytheon.uf.common.datastorage.IDataStore#copy(java.lang.String,
|
||||
* com.raytheon.uf.common.datastorage.StorageProperties.Compression,
|
||||
* java.lang.String, int, int)
|
||||
*/
|
||||
@Override
|
||||
public void copy(String outputDir, Compression compression,
|
||||
String timestampCheck, int minMillisSinceLastChange,
|
||||
int maxMillisSinceLastChange) throws StorageException {
|
||||
delegate.copy(outputDir, compression, timestampCheck,
|
||||
minMillisSinceLastChange, maxMillisSinceLastChange);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,4 +26,4 @@ acarssounding.cron=00+10,30,50+*+*+*+?
|
|||
gfe.cron=0+15+*+*+*+?
|
||||
repack.cron=0+20+*+*+*+?
|
||||
# runs database and hdf5 archive for archive server to pull data from
|
||||
archive.cron=0+0+*+*+*+?
|
||||
archive.cron=0+40+*+*+*+?
|
|
@ -1960,8 +1960,14 @@ public class HDF5DataStore implements IDataStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void repack(Compression compression, String outputDir,
|
||||
String timestampCheck) throws StorageException {
|
||||
public void repack(Compression compression) throws StorageException {
|
||||
throw new StorageException("Operation not supported", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copy(String outputDir, Compression compression,
|
||||
String timestampCheck, int minMillisSinceLastChange,
|
||||
int maxMillisSinceLastChange) throws StorageException {
|
||||
throw new StorageException("Operation not supported", null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -257,10 +257,20 @@ public interface IDataStore extends ISerializableObject {
|
|||
*
|
||||
* @param compression
|
||||
* the type of compression to repack with
|
||||
*/
|
||||
public void repack(Compression compression) throws StorageException;
|
||||
|
||||
/**
|
||||
* Recursively copies all files of a certain directory. If compression is
|
||||
* specified the file will be repacked to the specified compression.
|
||||
* Presumes that the IDataStore instance is tied to a directory, not a
|
||||
* specific file.
|
||||
*
|
||||
* @param outputDir
|
||||
* the output directory to put the repacked files, or null if the
|
||||
* same dir. If the same dir is used, it will delete the original
|
||||
* file.
|
||||
* the output directory to put the copied files
|
||||
* @param compression
|
||||
* If specified will repack the output file with a given
|
||||
* compression
|
||||
* @param timestampCheck
|
||||
* if not null, the attribute to check on the file for a
|
||||
* timestamp of the last time this particular action was run.
|
||||
|
@ -269,8 +279,18 @@ public interface IDataStore extends ISerializableObject {
|
|||
* future requests for the same file will check this attribute
|
||||
* and if the file has not been modified since last run, the file
|
||||
* will be skipped.
|
||||
* @param minMillisSinceLastChange
|
||||
* if greater than 0, the last modified time on the file cannot
|
||||
* be within minMillisSinceLastChange from current time. This is
|
||||
* used to not repack files that have changed within a recent
|
||||
* threshold.
|
||||
* @param maxMillisSinceLastChange
|
||||
* if greater than 0, the last modified time on the file must be
|
||||
* within maxMillisSinceLastChange from current time. This is
|
||||
* used to ignore files that have not changed within a recent
|
||||
* threshold.
|
||||
*/
|
||||
public void repack(Compression compression, String outputDir,
|
||||
String timestampCheck) throws StorageException;
|
||||
|
||||
public void copy(String outputDir, Compression compression,
|
||||
String timestampCheck, int minMillisSinceLastChange,
|
||||
int maxMillisSinceLastChange) throws StorageException;
|
||||
}
|
|
@ -15,6 +15,7 @@ import com.raytheon.uf.common.datastorage.StorageProperties.Compression;
|
|||
import com.raytheon.uf.common.datastorage.StorageStatus;
|
||||
import com.raytheon.uf.common.datastorage.records.IDataRecord;
|
||||
import com.raytheon.uf.common.pypies.request.AbstractRequest;
|
||||
import com.raytheon.uf.common.pypies.request.CopyRequest;
|
||||
import com.raytheon.uf.common.pypies.request.CreateDatasetRequest;
|
||||
import com.raytheon.uf.common.pypies.request.DatasetDataRequest;
|
||||
import com.raytheon.uf.common.pypies.request.DatasetNamesRequest;
|
||||
|
@ -25,7 +26,7 @@ import com.raytheon.uf.common.pypies.request.RepackRequest;
|
|||
import com.raytheon.uf.common.pypies.request.RetrieveRequest;
|
||||
import com.raytheon.uf.common.pypies.request.StoreRequest;
|
||||
import com.raytheon.uf.common.pypies.response.ErrorResponse;
|
||||
import com.raytheon.uf.common.pypies.response.RepackResponse;
|
||||
import com.raytheon.uf.common.pypies.response.FileActionResponse;
|
||||
import com.raytheon.uf.common.pypies.response.RetrieveResponse;
|
||||
import com.raytheon.uf.common.pypies.response.StoreResponse;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
|
@ -374,14 +375,11 @@ public class PyPiesDataStore implements IDataStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void repack(Compression compression, String outputDir,
|
||||
String timestampCheck) throws StorageException {
|
||||
public void repack(Compression compression) throws StorageException {
|
||||
RepackRequest req = new RepackRequest();
|
||||
req.setFilename(this.filename);
|
||||
req.setCompression(compression);
|
||||
req.setOutputDir(outputDir);
|
||||
req.setTimestampCheck(timestampCheck);
|
||||
RepackResponse resp = (RepackResponse) sendRequest(req);
|
||||
FileActionResponse resp = (FileActionResponse) sendRequest(req);
|
||||
// TODO do we really want to make this an exception?
|
||||
// reasoning is if the repack fails for some reason, the original file
|
||||
// is left as is, just isn't as efficiently packed
|
||||
|
@ -400,4 +398,35 @@ public class PyPiesDataStore implements IDataStore {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copy(String outputDir, Compression compression,
|
||||
String timestampCheck, int minMillisSinceLastChange,
|
||||
int maxMillisSinceLastChange) throws StorageException {
|
||||
CopyRequest req = new CopyRequest();
|
||||
req.setFilename(this.filename);
|
||||
if (compression != null) {
|
||||
req.setRepack(true);
|
||||
req.setRepackCompression(compression);
|
||||
} else {
|
||||
req.setRepack(false);
|
||||
}
|
||||
req.setOutputDir(outputDir);
|
||||
req.setTimestampCheck(timestampCheck);
|
||||
req.setMinMillisSinceLastChange(minMillisSinceLastChange);
|
||||
FileActionResponse resp = (FileActionResponse) sendRequest(req);
|
||||
|
||||
if (resp != null && resp.getFailedFiles() != null
|
||||
&& resp.getFailedFiles().length > 0) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Error copying the following files: ");
|
||||
String[] failed = resp.getFailedFiles();
|
||||
for (int i = 0; i < failed.length; i++) {
|
||||
sb.append(failed[i]);
|
||||
if (i < failed.length - 1) {
|
||||
sb.append(", ");
|
||||
}
|
||||
}
|
||||
throw new StorageException(sb.toString(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.common.pypies.request;
|
||||
|
||||
import com.raytheon.uf.common.datastorage.StorageProperties.Compression;
|
||||
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
|
||||
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
||||
|
||||
/**
|
||||
* TODO Add Description
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jan 17, 2012 rjpeter Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author rjpeter
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
@DynamicSerialize
|
||||
public class CopyRequest extends AbstractRequest {
|
||||
@DynamicSerializeElement
|
||||
private boolean repack;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private Compression repackCompression;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private String outputDir;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private String timestampCheck;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private int minMillisSinceLastChange;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private int maxMillisSinceLastChange;
|
||||
|
||||
/**
|
||||
* @return the repack
|
||||
*/
|
||||
public boolean isRepack() {
|
||||
return repack;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param repack
|
||||
* the repack to set
|
||||
*/
|
||||
public void setRepack(boolean repack) {
|
||||
this.repack = repack;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the repackCompression
|
||||
*/
|
||||
public Compression getRepackCompression() {
|
||||
return repackCompression;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param repackCompression
|
||||
* the repackCompression to set
|
||||
*/
|
||||
public void setRepackCompression(Compression repackCompression) {
|
||||
this.repackCompression = repackCompression;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the outputDir
|
||||
*/
|
||||
public String getOutputDir() {
|
||||
return outputDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param outputDir
|
||||
* the outputDir to set
|
||||
*/
|
||||
public void setOutputDir(String outputDir) {
|
||||
this.outputDir = outputDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the timestampCheck
|
||||
*/
|
||||
public String getTimestampCheck() {
|
||||
return timestampCheck;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param timestampCheck
|
||||
* the timestampCheck to set
|
||||
*/
|
||||
public void setTimestampCheck(String timestampCheck) {
|
||||
this.timestampCheck = timestampCheck;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the minMillisSinceLastChange
|
||||
*/
|
||||
public int getMinMillisSinceLastChange() {
|
||||
return minMillisSinceLastChange;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param minMillisSinceLastChange
|
||||
* the minMillisSinceLastChange to set
|
||||
*/
|
||||
public void setMinMillisSinceLastChange(int minMillisSinceLastChange) {
|
||||
this.minMillisSinceLastChange = minMillisSinceLastChange;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maxMillisSinceLastChange
|
||||
*/
|
||||
public int getMaxMillisSinceLastChange() {
|
||||
return maxMillisSinceLastChange;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxMillisSinceLastChange
|
||||
* the maxMillisSinceLastChange to set
|
||||
*/
|
||||
public void setMaxMillisSinceLastChange(int maxMillisSinceLastChange) {
|
||||
this.maxMillisSinceLastChange = maxMillisSinceLastChange;
|
||||
}
|
||||
}
|
|
@ -46,12 +46,6 @@ public class RepackRequest extends AbstractRequest {
|
|||
@DynamicSerializeElement
|
||||
private Compression compression;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private String outputDir;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private String timestampCheck;
|
||||
|
||||
public Compression getCompression() {
|
||||
return compression;
|
||||
}
|
||||
|
@ -60,20 +54,4 @@ public class RepackRequest extends AbstractRequest {
|
|||
this.compression = compression;
|
||||
}
|
||||
|
||||
public String getOutputDir() {
|
||||
return outputDir;
|
||||
}
|
||||
|
||||
public void setOutputDir(String outputDir) {
|
||||
this.outputDir = outputDir;
|
||||
}
|
||||
|
||||
public String getTimestampCheck() {
|
||||
return timestampCheck;
|
||||
}
|
||||
|
||||
public void setTimestampCheck(String timestampCheck) {
|
||||
this.timestampCheck = timestampCheck;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
|||
*/
|
||||
|
||||
@DynamicSerialize
|
||||
public class RepackResponse {
|
||||
public class FileActionResponse {
|
||||
|
||||
@DynamicSerializeElement
|
||||
private String[] successfulFiles;
|
|
@ -78,7 +78,8 @@ public class FileUtil {
|
|||
StringBuilder fullPath = new StringBuilder();
|
||||
for (String component : pathComponents) {
|
||||
if ((fullPath.length() > 0)
|
||||
&& (fullPath.charAt(fullPath.length() - 1) != File.separatorChar)) {
|
||||
&& (fullPath.charAt(fullPath.length() - 1) != File.separatorChar)
|
||||
&& (component.charAt(0) != File.separatorChar)) {
|
||||
fullPath.append(File.separatorChar);
|
||||
}
|
||||
fullPath.append(component);
|
||||
|
|
|
@ -23,9 +23,11 @@
|
|||
<constructor-arg ref="databaseArchiver"/>
|
||||
</bean>
|
||||
|
||||
<!-- Need to register with databaseArchiver for archiving associated data store
|
||||
<bean id="datastoreArchiverRegistered" factory-bean="dataArchiver" factory-method="registerPluginArchiver">
|
||||
<constructor-arg ref="dataStoreArchiver"/>
|
||||
</bean>
|
||||
-->
|
||||
|
||||
<camelContext id="clusteredMaintenanceContext" xmlns="http://camel.apache.org/schema/spring"
|
||||
errorHandlerRef="errorHandler" autoStartup="false">
|
||||
|
|
|
@ -84,7 +84,7 @@ public class DataStoreRepacker {
|
|||
String dir = hdf5Dir + File.separator + plugin;
|
||||
IDataStore ds = DataStoreFactory.getDataStore(new File(dir));
|
||||
try {
|
||||
ds.repack(compression, null, "lastRepacked");
|
||||
ds.repack(compression);
|
||||
} catch (StorageException e) {
|
||||
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage());
|
||||
}
|
||||
|
@ -109,5 +109,4 @@ public class DataStoreRepacker {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ import com.raytheon.uf.edex.maintenance.archive.config.DataArchiveConfig;
|
|||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class DataStoreArchiver implements IPluginArchiver {
|
||||
public class DataStoreArchiver {
|
||||
|
||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(DataStoreArchiver.class);
|
||||
|
@ -67,19 +67,22 @@ public class DataStoreArchiver implements IPluginArchiver {
|
|||
this.compression = Compression.valueOf(compression);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void archivePlugin(String pluginName, String archiveDir,
|
||||
public void archiveFiles(String[] hdf5Files, String archiveDir,
|
||||
DataArchiveConfig conf) {
|
||||
String dirToArchive = hdf5Dir + File.separator + pluginName;
|
||||
IDataStore ds = DataStoreFactory.getDataStore(new File(dirToArchive));
|
||||
String outputDir = archiveDir + File.separator + pluginName;
|
||||
statusHandler.info("Archiving " + dirToArchive);
|
||||
for (String hdf5File : hdf5Files) {
|
||||
IDataStore ds = DataStoreFactory.getDataStore(new File(hdf5File));
|
||||
String outputDir = archiveDir; // + dirs of hdf5 file
|
||||
|
||||
try {
|
||||
ds.repack(compression, outputDir, "lastArchived");
|
||||
} catch (StorageException e) {
|
||||
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage());
|
||||
try {
|
||||
// data must be older than 30 minutes, and no older than hours
|
||||
// to keep hours need to lookup plugin and see if compression
|
||||
// matches, or embed in configuration the compression level on
|
||||
// archive, but would still need to lookup plugin
|
||||
ds.copy(outputDir, compression, "lastArchived", 1800000,
|
||||
conf.getHoursToKeep() * 60000 + 1800000);
|
||||
} catch (StorageException e) {
|
||||
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,10 +41,15 @@ import com.raytheon.uf.common.dataplugin.persist.DefaultPathProvider;
|
|||
import com.raytheon.uf.common.dataplugin.persist.IHDFFilePathProvider;
|
||||
import com.raytheon.uf.common.dataplugin.persist.IPersistable;
|
||||
import com.raytheon.uf.common.dataplugin.persist.PersistableDataObject;
|
||||
import com.raytheon.uf.common.datastorage.DataStoreFactory;
|
||||
import com.raytheon.uf.common.datastorage.IDataStore;
|
||||
import com.raytheon.uf.common.datastorage.StorageException;
|
||||
import com.raytheon.uf.common.datastorage.StorageProperties.Compression;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.util.FileUtil;
|
||||
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
|
||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||
|
@ -145,6 +150,8 @@ public class DatabaseArchiver implements IPluginArchiver {
|
|||
return false;
|
||||
}
|
||||
|
||||
List<String> datastoreFilesToArchive = new ArrayList<String>();
|
||||
|
||||
startTime = determineStartTime(pluginName, ct.getExtraInfo(),
|
||||
runTime, dao, conf);
|
||||
Calendar endTime = determineEndTime(startTime, runTime);
|
||||
|
@ -154,18 +161,69 @@ public class DatabaseArchiver implements IPluginArchiver {
|
|||
Map<String, List<PersistableDataObject>> pdosToSave = getPdosByFile(
|
||||
pluginName, dao, pdoMap, startTime, endTime);
|
||||
|
||||
if (pdosToSave != null && pdosToSave.size() > 0) {
|
||||
if (pdosToSave != null && !pdosToSave.isEmpty()) {
|
||||
savePdoMap(pluginName, archivePath, pdosToSave);
|
||||
for (Map.Entry<String, List<PersistableDataObject>> entry : pdosToSave
|
||||
.entrySet()) {
|
||||
List<PersistableDataObject> pdoList = entry.getValue();
|
||||
if (pdoList != null && !pdoList.isEmpty()
|
||||
&& pdoList.get(0) instanceof IPersistable) {
|
||||
datastoreFilesToArchive.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startTime = endTime;
|
||||
endTime = determineEndTime(startTime, runTime);
|
||||
}
|
||||
|
||||
if (pdoMap != null && pdoMap.size() > 0) {
|
||||
if (pdoMap != null && !pdoMap.isEmpty()) {
|
||||
savePdoMap(pluginName, archivePath, pdoMap);
|
||||
}
|
||||
|
||||
if (!datastoreFilesToArchive.isEmpty()) {
|
||||
Compression compRequired = Compression.LZF;
|
||||
|
||||
PluginProperties props = PluginRegistry.getInstance()
|
||||
.getRegisteredObject(pluginName);
|
||||
|
||||
if (props != null && props.getCompression() != null) {
|
||||
if (compRequired.equals(Compression.valueOf(props
|
||||
.getCompression()))) {
|
||||
// if plugin is already compressed to the correct level,
|
||||
// no additional compression required
|
||||
compRequired = null;
|
||||
}
|
||||
}
|
||||
|
||||
for (String dataStoreFile : datastoreFilesToArchive) {
|
||||
IDataStore ds = DataStoreFactory.getDataStore(new File(
|
||||
FileUtil.join(PluginDao.HDF5_DIR, pluginName,
|
||||
dataStoreFile)));
|
||||
int pathSep = dataStoreFile.lastIndexOf(File.separatorChar);
|
||||
String outputDir = (pathSep > 0 ? FileUtil.join(
|
||||
archivePath, pluginName,
|
||||
dataStoreFile.substring(0, pathSep)) : FileUtil
|
||||
.join(archivePath, pluginName, dataStoreFile));
|
||||
|
||||
try {
|
||||
// data must be older than 30 minutes, and no older than
|
||||
// hours
|
||||
// to keep hours need to lookup plugin and see if
|
||||
// compression
|
||||
// matches, or embed in configuration the compression
|
||||
// level on
|
||||
// archive, but would still need to lookup plugin
|
||||
ds.copy(outputDir, compRequired, "lastArchived",
|
||||
1800000,
|
||||
conf.getHoursToKeep() * 60000 + 1800000);
|
||||
} catch (StorageException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
e.getLocalizedMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// set last archive time to startTime
|
||||
if (startTime != null) {
|
||||
lockHandler
|
||||
|
@ -212,7 +270,7 @@ public class DatabaseArchiver implements IPluginArchiver {
|
|||
endTime);
|
||||
|
||||
Set<String> newFileEntries = new HashSet<String>();
|
||||
if (pdos != null && pdos.size() > 0) {
|
||||
if (pdos != null && !pdos.isEmpty()) {
|
||||
if (pdos.get(0) instanceof IPersistable) {
|
||||
IHDFFilePathProvider pathProvider = dao.pathProvider;
|
||||
|
||||
|
@ -291,11 +349,8 @@ public class DatabaseArchiver implements IPluginArchiver {
|
|||
+ File.separator + entry.getKey();
|
||||
|
||||
// remove .h5
|
||||
int index = path.lastIndexOf('.');
|
||||
if (index > 0 && path.length() - index < 5) {
|
||||
// ensure its end of string in case extension is
|
||||
// dropped/changed
|
||||
path = path.substring(0, index);
|
||||
if (path.endsWith(".h5")) {
|
||||
path = path.substring(0, path.length() - 3);
|
||||
}
|
||||
|
||||
path += ".bin.gz";
|
||||
|
@ -329,7 +384,7 @@ public class DatabaseArchiver implements IPluginArchiver {
|
|||
Calendar startTime = null;
|
||||
|
||||
// get previous run time
|
||||
if (extraInfo != null && extraInfo.length() > 0) {
|
||||
if (extraInfo != null && !extraInfo.isEmpty()) {
|
||||
try {
|
||||
Date prevDate = DATE_FORMAT.parse(extraInfo);
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ public class MpeLightningSrv {
|
|||
private QueryResultRow[] getMostRecentStrikes() throws EdexException {
|
||||
QueryResult rs = null;
|
||||
CoreDao coreDao = new CoreDao(DaoConfig.DEFAULT);
|
||||
final String lgtSQL = "select datauri, hdffileid from binlightning "
|
||||
final String lgtSQL = "select datauri from binlightning "
|
||||
+ "where reftime > (now()- interval \'30 minutes \')";
|
||||
try {
|
||||
rs = (QueryResult) coreDao.executeNativeSql(lgtSQL, true);
|
||||
|
@ -104,16 +104,13 @@ public class MpeLightningSrv {
|
|||
* Inserts a single record into ihfs's lightning table.
|
||||
*
|
||||
* @param dataURI
|
||||
* @param hdfFileId
|
||||
* @throws EdexException
|
||||
*/
|
||||
private void ifhsInsertLightRecord(String dataURI, int hdfFileId)
|
||||
throws EdexException {
|
||||
private void ifhsInsertLightRecord(String dataURI) throws EdexException {
|
||||
int results = 0;
|
||||
try {
|
||||
// set up a lightning record
|
||||
BinLightningRecord ltngRec = new BinLightningRecord(dataURI);
|
||||
ltngRec.setHdfFileId(hdfFileId);
|
||||
|
||||
EnvProperties properties = PropertiesFactory.getInstance()
|
||||
.getEnvProperties();
|
||||
|
@ -171,10 +168,11 @@ public class MpeLightningSrv {
|
|||
String tuple = "(" + x_hgrids[j] + "," + y_hgrids[j]
|
||||
+ ", TIMESTAMP '" + ts.toString() + "' ," + strikes[j]
|
||||
+ ")";
|
||||
if (j != x_hgrids.length - 1)
|
||||
if (j != x_hgrids.length - 1) {
|
||||
tuple = tuple + ",";
|
||||
else
|
||||
} else {
|
||||
tuple = tuple + ";";
|
||||
}
|
||||
sql.append(tuple);
|
||||
}
|
||||
|
||||
|
@ -205,12 +203,12 @@ public class MpeLightningSrv {
|
|||
*/
|
||||
private void ifhsInsertMostRecentStrikes(QueryResultRow[] rows)
|
||||
throws EdexException {
|
||||
if (rows.length == 0)
|
||||
if (rows.length == 0) {
|
||||
logger.info("No new lightning records to insert in ifhs. ");
|
||||
}
|
||||
for (QueryResultRow row : rows) {
|
||||
String dataURI = (String) row.getColumn(0);
|
||||
Integer hdfFileId = (Integer) row.getColumn(1);
|
||||
ifhsInsertLightRecord(dataURI, hdfFileId.intValue());
|
||||
ifhsInsertLightRecord(dataURI);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
##
|
||||
# This software was developed and / or modified by Raytheon Company,
|
||||
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
#
|
||||
# U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
# This software product contains export-restricted data whose
|
||||
# export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
# to non-U.S. persons whether in the United States or abroad requires
|
||||
# an export license or other authorization.
|
||||
#
|
||||
# Contractor Name: Raytheon Company
|
||||
# Contractor Address: 6825 Pine Street, Suite 340
|
||||
# Mail Stop B8
|
||||
# Omaha, NE 68106
|
||||
# 402.291.0100
|
||||
#
|
||||
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
# further licensing information.
|
||||
##
|
||||
|
||||
# File auto-generated against equivalent DynamicSerialize Java class
|
||||
|
||||
class CopyRequest(object):
|
||||
|
||||
def __init__(self):
|
||||
self.repack = None
|
||||
self.repackCompression = None
|
||||
self.outputDir = None
|
||||
self.timestampCheck = None
|
||||
self.minMillisSinceLastChange = None
|
||||
self.maxMillisSinceLastChange = None
|
||||
self.filename = None
|
||||
|
||||
def getRepack(self):
|
||||
return self.repack
|
||||
|
||||
def setRepack(self, repack):
|
||||
self.repack = repack
|
||||
|
||||
def getRepackCompression(self):
|
||||
return self.repackCompression
|
||||
|
||||
def setRepackCompression(self, repackCompression):
|
||||
self.repackCompression = repackCompression
|
||||
|
||||
def getOutputDir(self):
|
||||
return self.outputDir
|
||||
|
||||
def setOutputDir(self, outputDir):
|
||||
self.outputDir = outputDir
|
||||
|
||||
def getTimestampCheck(self):
|
||||
return self.timestampCheck
|
||||
|
||||
def setTimestampCheck(self, timestampCheck):
|
||||
self.timestampCheck = timestampCheck
|
||||
|
||||
def getMinMillisSinceLastChange(self):
|
||||
return self.minMillisSinceLastChange
|
||||
|
||||
def setMinMillisSinceLastChange(self, minMillisSinceLastChange):
|
||||
self.minMillisSinceLastChange = minMillisSinceLastChange
|
||||
|
||||
def getMaxMillisSinceLastChange(self):
|
||||
return self.maxMillisSinceLastChange
|
||||
|
||||
def setMaxMillisSinceLastChange(self, maxMillisSinceLastChange):
|
||||
self.maxMillisSinceLastChange = maxMillisSinceLastChange
|
||||
|
||||
def getFilename(self):
|
||||
return self.filename
|
||||
|
||||
def setFilename(self, filename):
|
||||
self.filename = filename
|
|
@ -24,8 +24,6 @@ class RepackRequest(object):
|
|||
|
||||
def __init__(self):
|
||||
self.compression = None
|
||||
self.outputDir = None
|
||||
self.timestampCheck = None
|
||||
self.filename = None
|
||||
|
||||
def getCompression(self):
|
||||
|
@ -34,21 +32,8 @@ class RepackRequest(object):
|
|||
def setCompression(self, compression):
|
||||
self.compression = compression
|
||||
|
||||
def getOutputDir(self):
|
||||
return self.outputDir
|
||||
|
||||
def setOutputDir(self, outputDir):
|
||||
self.outputDir = outputDir
|
||||
|
||||
def getTimestampCheck(self):
|
||||
return self.timestampCheck
|
||||
|
||||
def setTimestampCheck(self, timestampCheck):
|
||||
self.timestampCheck = timestampCheck
|
||||
|
||||
def getFilename(self):
|
||||
return self.filename
|
||||
|
||||
def setFilename(self, filename):
|
||||
self.filename = filename
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
# File auto-generated by PythonFileGenerator
|
||||
|
||||
__all__ = [
|
||||
'CopyRequest',
|
||||
'CreateDatasetRequest',
|
||||
'DatasetDataRequest',
|
||||
'DatasetNamesRequest',
|
||||
|
@ -32,6 +33,7 @@ __all__ = [
|
|||
'StoreRequest'
|
||||
]
|
||||
|
||||
from CopyRequest import CopyRequest
|
||||
from CreateDatasetRequest import CreateDatasetRequest
|
||||
from DatasetDataRequest import DatasetDataRequest
|
||||
from DatasetNamesRequest import DatasetNamesRequest
|
||||
|
@ -41,4 +43,3 @@ from GroupsRequest import GroupsRequest
|
|||
from RepackRequest import RepackRequest
|
||||
from RetrieveRequest import RetrieveRequest
|
||||
from StoreRequest import StoreRequest
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
# File auto-generated against equivalent DynamicSerialize Java class
|
||||
|
||||
class RepackResponse(object):
|
||||
class FileActionResponse(object):
|
||||
|
||||
def __init__(self):
|
||||
self.successfulFiles = None
|
||||
|
@ -37,4 +37,3 @@ class RepackResponse(object):
|
|||
|
||||
def setFailedFiles(self, failedFiles):
|
||||
self.failedFiles = failedFiles
|
||||
|
|
@ -23,14 +23,13 @@
|
|||
__all__ = [
|
||||
'DeleteResponse',
|
||||
'ErrorResponse',
|
||||
'RepackResponse',
|
||||
'FileActionResponse',
|
||||
'RetrieveResponse',
|
||||
'StoreResponse'
|
||||
]
|
||||
|
||||
from DeleteResponse import DeleteResponse
|
||||
from ErrorResponse import ErrorResponse
|
||||
from RepackResponse import RepackResponse
|
||||
from FileActionResponse import FileActionResponse
|
||||
from RetrieveResponse import RetrieveResponse
|
||||
from StoreResponse import StoreResponse
|
||||
|
||||
|
|
|
@ -55,7 +55,8 @@ datastoreMap = {
|
|||
DeleteRequest: (datastore.delete, "DeleteRequest"),
|
||||
DeleteFilesRequest: (datastore.deleteFiles, "DeleteFilesRequest"),
|
||||
CreateDatasetRequest: (datastore.createDataset, "CreateDatasetRequest"),
|
||||
RepackRequest: (datastore.repack, "RepackRequest")
|
||||
RepackRequest: (datastore.repack, "RepackRequest"),
|
||||
CopyRequest: (datastore.copy, "CopyRequest")
|
||||
}
|
||||
|
||||
@Request.application
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#
|
||||
|
||||
import h5py, os, numpy, pypies, re, logging, shutil, time, types
|
||||
import fnmatch
|
||||
import subprocess, stat #for h5repack
|
||||
from pypies import IDataStore, StorageException, NotImplementedException
|
||||
from pypies import MkDirLockManager as LockManager
|
||||
|
@ -617,33 +618,72 @@ class H5pyDataStore(IDataStore.IDataStore):
|
|||
#id = group.id
|
||||
#id.link(dataset.name, linkName, h5py.h5g.LINK_SOFT)
|
||||
|
||||
def copy(self, request):
|
||||
resp = FileActionResponse()
|
||||
file = request.getFilename()
|
||||
pth = os.path.split(file)[0]
|
||||
repack = request.getRepack()
|
||||
action = self.__doCopy if not repack else self.__doRepack
|
||||
self.__doFileAction(file, pth, request.getOutputDir(), action, resp, request.getRepackCompression(), request.getTimestampCheck())
|
||||
return resp
|
||||
|
||||
def __doCopy(self, filepath, basePath, outputDir, compression):
|
||||
shutil.copy(filepath, outputDir)
|
||||
success = (os.path.isfile(os.path.join(outputDir, os.path.basename(filepath))))
|
||||
return success
|
||||
|
||||
def repack(self, request):
|
||||
resp = FileActionResponse()
|
||||
pth = request.getFilename()
|
||||
files = self.__listHdf5Files(pth)
|
||||
compression = request.getCompression()
|
||||
resp = RepackResponse()
|
||||
if os.path.exists(pth):
|
||||
self.__recurseRepack(pth, request, resp)
|
||||
for f in files:
|
||||
self.__doFileAction(f, pth, None, self.__doRepack, resp, compression)
|
||||
return resp
|
||||
|
||||
def __recurseRepack(self, pth, req, resp):
|
||||
files = os.listdir(pth)
|
||||
for f in files:
|
||||
fullpath = pth + '/' + f
|
||||
if os.path.isdir(fullpath):
|
||||
self.__recurseRepack(fullpath, req, resp)
|
||||
elif len(f) > 3 and f[-3:] == '.h5':
|
||||
self.__doRepack(fullpath, req, resp)
|
||||
def __listHdf5Files(self, pth):
|
||||
results = []
|
||||
for base, dirs, files in os.walk(pth):
|
||||
goodfiles = fnmatch.filter(files, '*.h5')
|
||||
results.extend(os.path.join(base, f) for f in goodfiles)
|
||||
return results
|
||||
|
||||
def __doRepack(self, fullpath, req, response):
|
||||
def __doRepack(self, filepath, basePath, outDir, compression):
|
||||
# call h5repack to repack the file
|
||||
if outDir is None:
|
||||
repackedFullPath = filepath + '.repacked'
|
||||
else:
|
||||
repackedFullPath = filepath.replace(basePath, outDir)
|
||||
cmd = ['h5repack', '-f', compression, filepath, repackedFullPath]
|
||||
ret = subprocess.call(cmd)
|
||||
|
||||
success = (ret == 0)
|
||||
if success:
|
||||
# repack was successful, replace the old file if we did it in the
|
||||
# same directory, otherwise leave it alone
|
||||
if outDir is None:
|
||||
os.remove(filepath)
|
||||
os.rename(repackedFullPath, filepath)
|
||||
os.chmod(filepath, stat.S_IWUSR | stat.S_IWGRP | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
||||
else:
|
||||
# remove failed new file if there was one
|
||||
if os.path.exists(repackedFullPath):
|
||||
os.remove(repackedFullPath)
|
||||
if outDir is not None:
|
||||
# repack failed, but they wanted the data in a different
|
||||
# directory, so just copy the original data without the repack
|
||||
shutil.copy(filepath, repackedFullPath)
|
||||
return success
|
||||
|
||||
def __doFileAction(self, filepath, basePath, outputDir, fileAction, response, compression='NONE', timestampCheck=None):
|
||||
lock = None
|
||||
try:
|
||||
f, lock = self.__openFile(fullpath, 'a')
|
||||
f, lock = self.__openFile(filepath, 'a')
|
||||
proceedWithRepack = True
|
||||
timestampCheck = req.getTimestampCheck()
|
||||
if timestampCheck:
|
||||
if timestampCheck in f.attrs.keys():
|
||||
lastRepacked = f.attrs[timestampCheck]
|
||||
lastModified = os.stat(fullpath).st_mtime
|
||||
lastModified = os.stat(filepath).st_mtime
|
||||
if lastRepacked > lastModified:
|
||||
proceedWithRepack = False
|
||||
if proceedWithRepack:
|
||||
|
@ -654,30 +694,7 @@ class H5pyDataStore(IDataStore.IDataStore):
|
|||
f.attrs[timestampCheck] = time.time() + 30
|
||||
f.close()
|
||||
|
||||
# call h5repack to repack the file
|
||||
outDir = req.getOutputDir()
|
||||
if outDir is None:
|
||||
repackedFullPath = fullpath + '.repacked'
|
||||
else:
|
||||
repackedFullPath = fullpath.replace(req.getFilename(), outDir)
|
||||
cmd = ['h5repack', '-f', req.getCompression(), fullpath, repackedFullPath]
|
||||
ret = subprocess.call(cmd)
|
||||
success = (ret == 0)
|
||||
if success:
|
||||
# repack was successful, replace the old file if we did it in the
|
||||
# same directory, otherwise leave it alone
|
||||
if outDir is None:
|
||||
os.remove(fullpath)
|
||||
os.rename(repackedFullPath, fullpath)
|
||||
os.chmod(fullpath, stat.S_IWUSR | stat.S_IWGRP | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
||||
else:
|
||||
# remove failed new file if there was one
|
||||
if os.path.exists(repackedFullPath):
|
||||
os.remove(repackedFullPath)
|
||||
if outDir is not None:
|
||||
# repack failed, but they wanted the data in a different
|
||||
# directory, so just copy the original data without the repack
|
||||
shutil.copy(fullpath, repackedFullPath)
|
||||
success = fileAction(filepath, basePath, outputDir, compression)
|
||||
|
||||
# update response
|
||||
if success:
|
||||
|
@ -688,20 +705,20 @@ class H5pyDataStore(IDataStore.IDataStore):
|
|||
setter = response.setFailedFiles
|
||||
responseList = getter()
|
||||
if responseList:
|
||||
responseList += [fullpath]
|
||||
responseList += [filepath]
|
||||
else:
|
||||
responseList = [fullpath]
|
||||
responseList = [filepath]
|
||||
setter(responseList)
|
||||
except Exception, e:
|
||||
logger.warn("Error repacking file " + fullpath + ": " + str(e))
|
||||
logger.warn("Error repacking file " + filepath + ": " + str(e))
|
||||
failed = response.getFailedFiles()
|
||||
if failed:
|
||||
failed += [fullpath]
|
||||
failed += [filepath]
|
||||
else:
|
||||
failed = [fullpath]
|
||||
failed = [filepath]
|
||||
response.setFailedFiles(failed)
|
||||
finally:
|
||||
if lock:
|
||||
LockManager.releaseLock(lock)
|
||||
LockManager.releaseLock(lock)
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue