Issue #1332: Update pypies to create/use condensed group structure

Change-Id: I2e7311b063df1a217a2fe37c0e470272e4e85e92

Former-commit-id: c72c0b9f98 [formerly 90f3cac62473c66dcdef73a6a605becf08868316]
Former-commit-id: dbe4290f45
This commit is contained in:
Richard Peter 2012-11-21 15:23:23 -06:00
parent a482cd3f17
commit 2b3f0a2938

View file

@ -1,19 +1,19 @@
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
@ -21,17 +21,17 @@
#
# h5py implementation of IDataStore
#
#
#
#
# SOFTWARE HISTORY
#
#
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 06/16/10 njensen Initial Creation.
# 05/03/11 9134 njensen Optimized for pointdata
# 10/09/12 rjpeter Optimized __getGroup for retrievals
#
#
# 10/09/12 rjpeter Optimized __getGroup for retrievals
#
#
#
import h5py, os, numpy, pypies, re, logging, shutil, time, types
@ -48,7 +48,7 @@ from dynamicserialize.dstypes.com.raytheon.uf.common.pypies.response import *
logger = pypies.logger
timeMap = pypies.timeMap
vlen_str_type = h5py.new_vlen(str)
dataRecordMap = {
@ -70,12 +70,12 @@ REQUEST_ALL.setType('ALL')
PURGE_REGEX = re.compile('(/[a-zA-Z]{1,25})/([0-9]{4}-[0-9]{2}-[0-9]{2})_([0-9]{2}):[0-9]{2}:[0-9]{2}')
class H5pyDataStore(IDataStore.IDataStore):
def __init__(self):
pass
def store(self, request):
fn = request.getFilename()
pass
def store(self, request):
fn = request.getFilename()
f, lock = self.__openFile(fn, 'w')
try:
recs = request.getRecords()
@ -94,18 +94,18 @@ class H5pyDataStore(IDataStore.IDataStore):
except:
logger.warn("Exception occurred on file " + fn + ":" + IDataStore._exc())
exc.append(IDataStore._exc())
failRecs.append(r)
if ss:
failRecs.append(r)
if ss:
status.setOperationPerformed(ss['op'])
if ss.has_key('index'):
if ss.has_key('index'):
status.setIndexOfAppend(ss['index'])
t1=time.time()
timeMap['store']=t1-t0
resp = StoreResponse()
resp.setStatus(status)
resp.setExceptions(exc)
resp.setFailedRecords(failRecs)
resp.setFailedRecords(failRecs)
return resp
finally:
t0=time.time()
@ -113,33 +113,34 @@ class H5pyDataStore(IDataStore.IDataStore):
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def __writeHDF(self, f, record, storeOp):
props = record.getProps()
if props and not props.getChunked() and props.getCompression() and \
props.getCompression() != 'NONE':
props.getCompression() != 'NONE':
raise StorageException('Data must be chunked to be compressed')
data = record.retrieveDataObject()
group = self.__getGroup(f, record.getGroup(), create=True)
rootNode=f['/']
group = self.__getNode(rootNode, record.getGroup(), None, create=True)
if record.getMinIndex() is not None and len(record.getMinIndex()):
ss = self.__writePartialHDFDataset(f, data, record.getDimension(), record.getSizes(), record.getName(),
group, props, record.getMinIndex())
ss = self.__writePartialHDFDataset(f, data, record.getDimension(), record.getSizes(),
group[record.getName()], props, record.getMinIndex())
else:
ss = self.__writeHDFDataset(f, data, record.getDimension(), record.getSizes(), record.getName(),
group, props, self.__getHdf5Datatype(record), storeOp, record)
if props and props.getDownscaled():
if props and props.getDownscaled():
intName = record.getGroup() + '/' + record.getName() + '-interpolated'
self.__getGroup(f, intName, True)
self.__link(group, intName + '/0', group[record.getName()])
intGroup = self.__getNode(rootNode, intName, None, create=True)
self.__link(intGroup, '0', group[record.getName()])
f.flush()
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Stored group " + str(record.getGroup()) + " to group " + str(group))
return ss
return ss
def __writeHDFDataset(self, f, data, dims, szDims, dataset, group, props, dataType, storeOp, rec):
nDims = len(szDims)
szDims1 = [None,] * nDims
@ -150,16 +151,16 @@ class H5pyDataStore(IDataStore.IDataStore):
if recMaxDims == None or recMaxDims[i] == 0:
maxDims[i] = None
else:
maxDims[i] = recMaxDims[i]
maxDims[i] = recMaxDims[i]
if type(data) is numpy.ndarray and data.shape != tuple(szDims1):
data = data.reshape(szDims1)
data = data.reshape(szDims1)
ss = {}
if dataset in group.keys():
if dataset in group.keys():
ds = group[dataset]
if storeOp == 'STORE_ONLY':
raise StorageException('Dataset ' + str(dataset) + ' already exists in group ' + str(group))
raise StorageException('Dataset ' + str(dataset) + ' already exists in group ' + str(group))
elif storeOp == 'APPEND':
if dims == 1:
newSize = [ds.shape[0] + szDims1[0]]
@ -178,35 +179,35 @@ class H5pyDataStore(IDataStore.IDataStore):
elif storeOp == 'REPLACE' or storeOp == 'OVERWRITE':
if ds.shape != data.shape:
ds.resize(data.shape)
ds[()] = data
ds[()] = data
ss['op'] = 'REPLACE'
else:
chunk = self.__calculateChunk(nDims, dataType, storeOp, maxDims)
else:
chunk = self.__calculateChunk(nDims, dataType, storeOp, maxDims)
compression = None
if props:
compression = props.getCompression()
compression = props.getCompression()
fillValue = rec.getFillValue()
ds = self.__createDatasetInternal(group, dataset, dataType, szDims1, maxDims, chunk, compression, fillValue)
#ds = group.create_dataset(dataset, szDims1, dataType, maxshape=maxDims, chunks=chunk, compression=compression)
ds[()] = data
ss['op'] = 'STORE_ONLY'
self.__writeProperties(rec, ds)
return ss
def __getHdf5Datatype(self, record):
dtype = dataRecordMap[record.__class__]
if dtype == types.StringType:
from h5py import h5t
size = record.getMaxLength()
if size > 0:
dtype = h5t.py_create('S' + str(size))
size = record.getMaxLength()
if size > 0:
dtype = h5t.py_create('S' + str(size))
else:
dtype = vlen_str_type
#dtype.set_strpad(h5t.STR_NULLTERM)
return dtype
return dtype
def __calculateChunk(self, nDims, dataType, storeOp, maxDims):
if nDims == 1:
chunk = [DEFAULT_CHUNK_SIZE]
@ -222,13 +223,13 @@ class H5pyDataStore(IDataStore.IDataStore):
# So fixed dimension gets chunk size of:
# blocksize / (fixed dimension size * bytesize)
# For example, float of dimensions (infinite, 6) would be
# (int(4096 / (6 * 4)), 6)
# (int(4096 / (6 * 4)), 6)
if dataType != vlen_str_type:
sizeOfEntry = numpy.dtype(dataType).itemsize
else:
sizeOfEntry = None
secondDim = maxDims[1]
secondDim = maxDims[1]
if sizeOfEntry:
chunkSize = int(FILESYSTEM_BLOCK_SIZE / (secondDim * sizeOfEntry))
chunk = [chunkSize, secondDim]
@ -239,111 +240,113 @@ class H5pyDataStore(IDataStore.IDataStore):
"data with mode " + storeOp + " not supported yet")
chunk = tuple(chunk)
return chunk
def __writeProperties(self, dr, dataset):
attrs = dr.getDataAttributes()
if attrs:
for key in attrs:
for key in attrs:
dataset.attrs[key] = attrs[key]
def __storeInterpolated(self, f, rec, op):
if op != 'REPLACE' and op != 'STORE_ONLY':
raise StorageException("Only replace and store modes are supported with interpolation enabled")
# store the base product
ss = self.__writeHDF(f, rec, op)
sizes = rec.getSizes()
ss = self.__writeHDF(f, rec, op)
sizes = rec.getSizes()
newSzX = sizes[1]
newSzY = sizes[0]
originalName = rec.getName()
originalGroup = rec.getGroup()
level = 0
# avoid recursive links by turning off downscaling
rec.getProps().setDownscaled(False)
rec.getProps().setDownscaled(False)
from PIL import Image
import time
while newSzX > DOWNSCALE_THRESHOLD or newSzY > DOWNSCALE_THRESHOLD:
data = rec.retrieveDataObject()
# data is potentially 1-dimensional from serialization, ensure it goes to correct 2 dimensions
data = data.reshape([newSzX, newSzY])
newSzX = newSzX / 2
newSzY = newSzY / 2
newSzY = newSzY / 2
level += 1
rec.setName(str(level))
rec.setGroup(originalGroup + '/' + originalName + '-interpolated')
rec.setGroup(originalGroup + '/' + originalName + '-interpolated')
# satellite data comes in as signed bytes but pil requires unsigned bytes
data = numpy.array(data + 127, dtype=numpy.uint8)
data = numpy.array(data + 127, dtype=numpy.uint8)
image = Image.fromarray(data)
image = image.resize((newSzY, newSzX))
downsized = numpy.array(image)
# transform back to signed bytes
downsized = numpy.array(downsized - 127, dtype=numpy.int8)
rec.putDataObject(downsized)
rec.setSizes([newSzY, newSzX])
rec.setSizes([newSzY, newSzX])
self.__writeHDF(f, rec, op)
return ss
def __writePartialHDFDataset(self, f, data, dims, szDims, datasetName, group, props,
minIndex):
def __writePartialHDFDataset(self, f, data, dims, szDims, ds, props,
minIndex):
# reverse sizes for hdf5
szDims1 = [None, ] * len(szDims)
for i in range(len(szDims)):
szDims1[i] = szDims[len(szDims) - i - 1]
szDims1[i] = szDims[len(szDims) - i - 1]
offset = [None, ] * len(minIndex)
for i in range(len(minIndex)):
offset[i] = minIndex[len(minIndex) - i - 1]
ds = group[datasetName]
# process chunking
offset[i] = minIndex[len(minIndex) - i - 1]
# process chunking
# chunkSize = None
# if data.dtype != numpy._string and data.dtype != numpy._object:
# chunkSize = DEFAULT_CHUNK_SIZE
# chunkSize = DEFAULT_CHUNK_SIZE
# else:
# chunkSize = 1
# chunk = [chunkSize] * len(szDims)
data = data.reshape(szDims1)
endIndex = [offset[0] + szDims1[0], offset[1] + szDims1[1]]
ds[offset[0]:endIndex[0], offset[1]:endIndex[1]] = data
return {'op':'REPLACE'}
def delete(self, request):
fn = request.getFilename()
f, lock = self.__openFile(fn, 'w')
resp = DeleteResponse()
resp.setSuccess(True)
deleteFile = False
try:
locs = request.getLocations()
for dataset in locs:
ds = self.__getGroup(f, dataset)
ds = self.__getNode(f, None, dataset)
grp = ds.parent
grp.id.unlink(ds.name)
finally:
# check if file has any remaining data sets
# if no data sets, flag file for deletion
f.flush()
deleteFile = not self.__hasDataSet(f)
finally:
deleteFile = False
try:
f.flush()
deleteFile = not self.__hasDataSet(f)
except Exception, e:
logger.error('Error occurred checking for dataSets in file [' + str(fn) + ']: ' + IDataStore._exc())
t0=time.time()
f.close()
t1=time.time()
timeMap['closeFile']=t1-t0
if deleteFile:
try:
os.remove(fn)
except Exception, e:
logger.error('Error occurred deleting file [' + str(fn) + ']: ' + IDataStore._exc())
LockManager.releaseLock(lock)
return resp
@ -358,18 +361,20 @@ class H5pyDataStore(IDataStore.IDataStore):
if self.__hasDataSet(child):
return True
return False
def retrieve(self, request):
fn = request.getFilename()
fn = request.getFilename()
f, lock = self.__openFile(fn, 'r')
try:
group = request.getGroup()
group = request.getGroup()
req = request.getRequest()
rootNode=f['/']
if req:
grp = self.__getGroup(f, group)
result = [self.__retrieveInternal(grp, request.getDataset(), req)]
ds = self.__getNode(rootNode, group, request.getDataset())
result = [self.__retrieveInternal(ds, req)]
else:
result = self.__retrieve(f, group, request.getIncludeInterpolated())
groupNode = self.__getNode(rootNode, group)
result = self.__retrieve(groupNode, request.getIncludeInterpolated())
resp = RetrieveResponse()
resp.setRecords(result)
return resp
@ -379,32 +384,29 @@ class H5pyDataStore(IDataStore.IDataStore):
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def __retrieve(self, f, group, includeInterpolated=False):
def __retrieve(self, group, includeInterpolated=False):
records = []
if type(group) is str:
group = self.__getGroup(f, group)
datasets = group.keys()
for ds in datasets:
interpDs = ds.endswith('-interpolated')
if includeInterpolated and interpDs:
subresults = self.__retrieve(f, group.name + '/' + ds, False)
subresults = self.__retrieve(group[ds], False)
if subresults:
records += subresults
elif not interpDs:
rec = self.__retrieveInternal(group, ds, REQUEST_ALL)
elif not interpDs:
rec = self.__retrieveInternal(group[ds], REQUEST_ALL)
records.append(rec)
return records
def __retrieveInternal(self, grp, dsName, req):
ds = grp[dsName]
def __retrieveInternal(self, ds, req):
rawData = HDF5OpManager.read(ds, req)
rec = DataStoreFactory.createStorageRecord(rawData, ds)
return rec
return rec
def retrieveDatasets(self, request):
t6 = time.time()
fn = request.getFilename()
@ -420,8 +422,9 @@ class H5pyDataStore(IDataStore.IDataStore):
names = request.getDatasetGroupPath()
req = request.getRequest()
result = []
rootNode=f['/']
for dsName in names:
ds = self.__getGroup(f, dsName)
ds = self.__getNode(rootNode, None, dsName)
t2 = time.time()
rawData = HDF5OpManager.read(ds, req)
t3 = time.time()
@ -430,7 +433,7 @@ class H5pyDataStore(IDataStore.IDataStore):
paramMap[dsName] = ('%.3f' % (diff))
rec = DataStoreFactory.createStorageRecord(rawData, ds)
result.append(rec)
resp = RetrieveResponse()
resp.setRecords(result)
return resp
@ -439,7 +442,7 @@ class H5pyDataStore(IDataStore.IDataStore):
t4 = time.time()
LockManager.releaseLock(lock)
t5 = time.time()
if logger.isEnabledFor(logging.DEBUG):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("pid=" + str(os.getpid()) + " filename=" + fn + \
", numberOfDatasets/Parameters=" + str(len(names)) + \
", getLockTime=" + ('%.3f' % (t1-t0)) + \
@ -447,7 +450,7 @@ class H5pyDataStore(IDataStore.IDataStore):
", releaseLockTime=" + ('%.3f' % (t5-t4)) + \
", retrieveDatasetsTotal=" + ('%.3f' % (t4-t6)) + \
", perParamRead=" + str(paramMap))
def retrieveGroups(self, request):
fn = request.getFilename()
f, lock = self.__openFile(fn, 'r')
@ -455,12 +458,14 @@ class H5pyDataStore(IDataStore.IDataStore):
groups = request.getGroups()
req = request.getRequest()
recs = []
rootNode=f['/']
for group in groups:
grp = self.__getGroup(f, group)
grp = self.__getNode(rootNode, group)
datasets = grp.keys()
for ds in datasets:
rawData = HDF5OpManager.read(grp[ds], req)
rec = DataStoreFactory.createStorageRecord(rawData, grp[ds])
dsNode=grp[ds]
rawData = HDF5OpManager.read(dsNode, req)
rec = DataStoreFactory.createStorageRecord(rawData, dsNode)
recs.append(rec)
resp = RetrieveResponse()
resp.setRecords(recs)
@ -471,13 +476,13 @@ class H5pyDataStore(IDataStore.IDataStore):
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def getDatasets(self, request):
fn = request.getFilename()
def getDatasets(self, request):
fn = request.getFilename()
f, lock = self.__openFile(fn, 'r')
try:
grpName = request.getGroup()
grp = self.__getGroup(f, grpName)
grp = self.__getNode(f['/'], grpName)
ds = grp.keys()
return ds
finally:
@ -486,7 +491,7 @@ class H5pyDataStore(IDataStore.IDataStore):
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def deleteFiles(self, request):
fn = request.getFilename()
if os.path.exists(fn):
@ -494,40 +499,40 @@ class H5pyDataStore(IDataStore.IDataStore):
self.__recursiveDeleteFiles(fn,request.getDatesToDelete())
else:
self.__removeFile(fn)
# if it didn't succeed, an exception will be thrown and therefore
# an error response returned
resp = DeleteResponse()
resp = DeleteResponse()
resp.setSuccess(True)
return resp
def createDataset(self, request):
fn = request.getFilename()
f, lock = self.__openFile(fn, 'w')
try:
rec = request.getRecord()
rec = request.getRecord()
props = rec.getProps()
if props and not props.getChunked() and props.getCompression != 'NONE':
if props and not props.getChunked() and props.getCompression != 'NONE':
raise StorageException("Data must be chunked to be compressed")
grp = rec.getGroup()
group = self.__getGroup(f, grp, create=True)
group = self.__getNode(f['/'], grp, None, create=True)
# reverse sizes for hdf5
szDims = rec.getSizes()
szDims1 = [None, ] * len(szDims)
for i in range(len(szDims)):
szDims1[i] = szDims[len(szDims) - i - 1]
szDims = tuple(szDims1)
chunks = None
if props and props.getChunked():
chunks = (DEFAULT_CHUNK_SIZE,) * len(szDims)
compression = None
if props:
compression = props.getCompression()
dtype = self.__getHdf5Datatype(rec)
compression = props.getCompression()
dtype = self.__getHdf5Datatype(rec)
datasetName = rec.getName()
fillValue = rec.getFillValue()
ds = self.__createDatasetInternal(group, datasetName, dtype, szDims,
@ -542,9 +547,9 @@ class H5pyDataStore(IDataStore.IDataStore):
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def __createDatasetInternal(self, group, datasetName, dtype, szDims,
maxDims=None, chunks=None, compression=None, fillValue=None):
maxDims=None, chunks=None, compression=None, fillValue=None):
plc = h5py.h5p.create(h5py.h5p.DATASET_CREATE)
if fillValue is not None:
fVal = numpy.zeros(1, dtype)
@ -556,21 +561,21 @@ class H5pyDataStore(IDataStore.IDataStore):
if compression == 'LZF':
plc.set_shuffle()
plc.set_filter(h5py.h5z.FILTER_LZF, h5py.h5z.FLAG_OPTIONAL)
szDims = tuple(szDims)
if maxDims is not None:
maxDims = tuple(x if x is not None else h5py.h5s.UNLIMITED for x in maxDims)
maxDims = tuple(x if x is not None else h5py.h5s.UNLIMITED for x in maxDims)
space_id = h5py.h5s.create_simple(szDims, maxDims)
type_id = h5py.h5t.py_create(dtype, logical=True)
id = h5py.h5d.create(group.id, datasetName, type_id, space_id, plc)
ds = group[datasetName]
return ds
def __recursiveDeleteFiles(self, dir, datesToDelete):
if os.path.exists(dir) and os.path.isdir(dir):
if datesToDelete is None:
self.__removeDir(dir)
else:
@ -598,11 +603,11 @@ class H5pyDataStore(IDataStore.IDataStore):
if gotLock:
os.remove(path)
else:
raise StorageException('Unable to acquire lock on file ' + path + ' for deleting')
raise StorageException('Unable to acquire lock on file ' + path + ' for deleting')
finally:
if gotLock:
LockManager.releaseLock(lock)
LockManager.releaseLock(lock)
def __removeDir(self, path):
gotLock = False
try:
@ -610,14 +615,14 @@ class H5pyDataStore(IDataStore.IDataStore):
if gotLock:
shutil.rmtree(path)
else:
raise StorageException('Unable to acquire lock on file ' + path + ' for deleting')
raise StorageException('Unable to acquire lock on file ' + path + ' for deleting')
finally:
if gotLock:
LockManager.releaseLock(lock)
def __openFile(self, filename, mode='r'):
LockManager.releaseLock(lock)
def __openFile(self, filename, mode='r'):
if mode == 'r' and not os.path.exists(filename):
raise StorageException('File ' + filename + ' does not exist')
raise StorageException('File ' + filename + ' does not exist')
gotLock, fd = LockManager.getLock(filename, mode)
t0=time.time()
if not gotLock:
@ -635,39 +640,89 @@ class H5pyDataStore(IDataStore.IDataStore):
t1=time.time()
timeMap['openFile']=t1-t0
return f, fd
def __getGroup(self, f, name, create=False):
return f, fd
def __getNode(self, rootNode, groupName, dsName=None, create=False):
t0=time.time()
if create:
parts = name.split('/')
grp = None
for s in parts:
if not grp:
if not s:
s = '/'
if s == '/' or s in f.keys():
grp = f[s]
else:
grp = f.create_group(s)
else:
if s:
if s in grp.keys():
grp = grp[s]
else:
grp = grp.create_group(s)
else:
if name is None or len(name.strip()) == 0:
# if no group is specific default to base group
grp = f['/']
# expected output to be node for /group1::group2::group3/dataSet
# expected output of /group1::group2::group3/dataSet-interpolated/1
if groupName:
if dsName:
toNormalize=groupName + '/' + dsName
else:
try:
group=name
if not group.startswith('/'):
group = '/' + group
grp = f[group]
except:
raise StorageException("No group " + name + " found")
toNormalize=groupName
elif dsName:
toNormalize=dsName
else:
# both None, return root node as default
return rootNode
tokens=toNormalize.split('/')
# remove any empty tokens
tokens = filter(None, tokens)
dsNameToken=None
if dsName:
# data set name was given, keep last token for ds name
dsNameToken = tokens.pop()
# need to check final token for -interpolated
isInterpToken = None
if tokens:
isInterpToken = tokens[-1]
if isInterpToken.endswith('-interpolated'):
del tokens[-1]
if dsNameToken:
dsNameToken = isInterpToken + '/' + dsNameToken
else:
dsNameToken = isInterpToken
if tokens:
basePath='::'.join(tokens)
else:
basePath=None
node = None
if create:
if basePath is None:
node = rootNode
if basePath in rootNode.keys():
node = rootNode[basePath]
else:
node = rootNode.create_group(basePath)
if dsNameToken:
for token in dsNameToken.split('/'):
if token in node.keys():
node = node[token]
else:
node = node.create_group(token)
else:
if dsNameToken:
if basePath:
basePath += '/' + dsNameToken
else:
basePath = dsNameToken
try:
if basePath:
node = rootNode[basePath]
else:
node = rootNode
except:
group = None
if groupName:
group = groupName
if dsName:
group += '/' + dsName
elif dsName:
group = dsName
# check old group structure
node = self.__getGroup(rootNode, group)
t1=time.time()
if timeMap.has_key('getGroup'):
@ -675,16 +730,32 @@ class H5pyDataStore(IDataStore.IDataStore):
else:
timeMap['getGroup']=t1-t0
return node
# deprecated, should only be called in transition period
def __getGroup(self, rootNode, name):
if name is None or len(name.strip()) == 0:
# if no group is specific default to base group
grp = rootNode
else:
try:
group=name
if group.startswith('/'):
group = group[1:]
grp = rootNode[group]
except:
raise StorageException("No group " + name + " found")
return grp
def __link(self, group, linkName, dataset):
# this is a hard link
group[linkName] = dataset
# untested code, this would be if we went with symbolic links, this is unpublished h5py API
#id = group.id
#id.link(dataset.name, linkName, h5py.h5g.LINK_SOFT)
def copy(self, request):
resp = FileActionResponse()
file = request.getFilename()
@ -693,12 +764,12 @@ class H5pyDataStore(IDataStore.IDataStore):
action = self.__doCopy if not repack else self.__doRepack
self.__doFileAction(file, pth, request.getOutputDir(), action, resp, request.getRepackCompression(), request.getTimestampCheck())
return resp
def __doCopy(self, filepath, basePath, outputDir, compression):
shutil.copy(filepath, outputDir)
success = (os.path.isfile(os.path.join(outputDir, os.path.basename(filepath))))
return success
def repack(self, request):
resp = FileActionResponse()
pth = request.getFilename()
@ -714,17 +785,17 @@ class H5pyDataStore(IDataStore.IDataStore):
goodfiles = fnmatch.filter(files, '*.h5')
results.extend(os.path.join(base, f) for f in goodfiles)
return results
def __doRepack(self, filepath, basePath, outDir, compression):
t0=time.time()
# call h5repack to repack the file
if outDir is None:
repackedFullPath = filepath + '.repacked'
repackedFullPath = filepath + '.repacked'
else:
repackedFullPath = filepath.replace(basePath, outDir)
cmd = ['h5repack', '-f', compression, filepath, repackedFullPath]
ret = subprocess.call(cmd)
success = (ret == 0)
if success:
# repack was successful, replace the old file if we did it in the
@ -747,7 +818,7 @@ class H5pyDataStore(IDataStore.IDataStore):
else:
timeMap['repack']=t1-t0
return success
def __doFileAction(self, filepath, basePath, outputDir, fileAction, response, compression='NONE', timestampCheck=None):
lock = None
try:
@ -766,23 +837,23 @@ class H5pyDataStore(IDataStore.IDataStore):
if timestampCheck:
f.attrs[timestampCheck] = time.time() + 30
f.close()
success = fileAction(filepath, basePath, outputDir, compression)
# update response
if success:
getter = response.getSuccessfulFiles
setter = response.setSuccessfulFiles
else:
getter = response.getFailedFiles
setter = response.setFailedFiles
setter = response.setFailedFiles
responseList = getter()
if responseList:
responseList += [filepath]
else:
responseList = [filepath]
setter(responseList)
except Exception, e:
except Exception, e:
logger.warn("Error repacking file " + filepath + ": " + str(e))
failed = response.getFailedFiles()
if failed:
@ -793,5 +864,3 @@ class H5pyDataStore(IDataStore.IDataStore):
finally:
if lock:
LockManager.releaseLock(lock)