awips2/pythonPackages/scientific/Scientific/BSP/IO.py
root e2ecdcfe33 Initial revision of AWIPS2 11.9.0-7p5
Former-commit-id: a02aeb236c [formerly 9f19e3f712] [formerly a02aeb236c [formerly 9f19e3f712] [formerly 06a8b51d6d [formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]]]
Former-commit-id: 06a8b51d6d
Former-commit-id: 8e80217e59 [formerly 3360eb6c5f]
Former-commit-id: 377dcd10b9
2012-01-06 08:55:05 -06:00

278 lines
10 KiB
Python
Executable file

# Parallel IO
#
# Written by Konrad Hinsen <hinsen@cnrs-orleans.fr>
# last revision: 2007-5-25
#
"""
Parallel acces to netCDF files
One netCDF dimension is defined for splitting the data among
processors such that each processor is responsible for one slice of
the file along that dimension.
Since netCDF files can be very big, the distribution algorithm gives
priority to memory efficiency over CPU time efficiency. The processor
that handles the file treats only one slice per superstep, which means
that at no time more than one slice must be stored in any processor.
"""
from Scientific.IO.NetCDF import NetCDFFile
from Scientific.BSP.core import ParClass, ParBase, ParInvalid, is_invalid
from Scientific import N
class _ParNetCDFFile(ParBase):
"""
Distributed netCDF file
A ParNetCDFFile object acts as much as possible like a NetCDFFile object.
Variables become ParNetCDFVariable objects, which behave like
distributed sequences. Variables that use the dimension named by
|split_dimension| are automatically distributed among the processors
such that each treats only one slice of the whole file.
"""
def __parinit__(self, pid, nprocs, filename, split_dimension,
mode = 'r', local_access = False):
"""
@param filename: the name of the netCDF file
@type filename: C{str}
@param split_dimension: the name of the dimension along which the data
is distributed over the processors
@type split_dimension: C{str}
@param mode: read ('r'), write ('w'), or append ('a')
@type mode: C{str}
@param local_access: if C{False}, processor 0 is the only one to
access the file, all others communicate with
processor 0. If C{True} (only for reading), each
processor accesses the file directly. In the
latter case, the file must be accessible on all
processors under the same name. A third mode is
'auto', which uses some heuristics to decide
if the file is accessible everywhere: it checks
for existence of the file, then compares
the size on all processors, and finally verifies
that the same variables exist everywhere, with
identical names, types, and sizes.
@type local_access: C{bool} or C{str}
"""
if mode != 'r':
local_access = 0
self.pid = pid
self.nprocs = nprocs
self.filename = filename
self.split = split_dimension
self.local_access = local_access
self.read_only = mode == 'r'
if local_access or pid == 0:
self.file = NetCDFFile(filename, mode)
try:
length = self.file.dimensions[split_dimension]
if length is None:
length = -1
except KeyError:
length = None
variables = {}
for name, var in self.file.variables.items():
variables[name] = (name, var.dimensions)
if length < 0 and split_dimension in var.dimensions:
index = list(var.dimensions).index(split_dimension)
length = var.shape[index]
else:
self.file = None
self.split = split_dimension
length = None
variables = None
if not local_access:
length = self.broadcast(length)
variables = self.broadcast(variables)
if length is not None:
self._divideData(length)
self.variables = {}
for name, var in variables.items():
self.variables[name] = _ParNetCDFVariable(self, var[0], var[1],
split_dimension)
def __repr__(self):
return repr(self.filename)
def close(self):
if self.local_access or self.pid == 0:
self.file.close()
def createDimension(self, name, length):
if name == self.split:
if length is None:
raise ValueError("Split dimension cannot be unlimited")
self._divideData(length)
if self.pid == 0:
self.file.createDimension(name, length)
def createVariable(self, name, typecode, dimensions):
if self.pid == 0:
var = self.file.createVariable(name, typecode, dimensions)
dim = var.dimensions
else:
dim = 0
name, dim = self.broadcast((name, dim))
self.variables[name] = _ParNetCDFVariable(self, name, dim, self.split)
return self.variables[name]
def _divideData(self, length):
chunk = (length+self.nprocs-1)/self.nprocs
self.first = min(self.pid*chunk, length)
self.last = min(self.first+chunk, length)
if (not self.local_access) and self.pid == 0:
self.parts = []
for pid in range(self.nprocs):
first = pid*chunk
last = min(first+chunk, length)
self.parts.append((first, last))
def sync(self):
if self.pid == 0:
self.file.sync()
flush = sync
class _ParNetCDFVariable(ParBase):
def __init__(self, file, name, dimensions, split_dimension):
self.file = file
self.pid = file.pid
self.nprocs = file.nprocs
self.name = name
self.dimensions = dimensions
self.value = self
self.attributes = {}
try:
self.index = list(dimensions).index(split_dimension)
except ValueError:
self.index = None
def __repr__(self):
return repr(self.name)
def __getitem__(self, item):
item = self._prepareIndices(item)
if self.file.local_access :
data = self._readData(item, self.file.first, self.file.last)
elif self.pid == 0:
for pid in range(1, self.nprocs):
first, last = self.file.parts[pid]
data = self._readData(item, first, last)
self.put(data, [pid])
data = self._readData(item, self.file.first, self.file.last)
else:
for pid in range(1, self.nprocs):
messages = self.put(None, [])
if messages:
data = messages[0]
if data is None:
return ParInvalid
else:
return data
def __getslice__(self, first, last):
return self.__getitem__(slice(first, last))
def __setitem__(self, item, value):
item = self._prepareIndices(item)
if is_invalid(value):
value = None
if self.pid == 0:
if value is not None:
self._writeData(item, value, self.file.first, self.file.last)
if self.index is not None:
for pid in range(1, self.nprocs):
first, last = self.file.parts[pid]
data = self.put(None, [])
if data and data[0] is not None:
self._writeData(item, data[0], first, last)
else:
if self.index is not None:
for pid in range(1, self.nprocs):
if pid == self.pid:
self.put(value, [0])
else:
self.put(None, [])
def __setslice__(self, first, last, value):
self.__setitem__(slice(first, last), value)
def _prepareIndices(self, item):
if not hasattr(item, 'is_parindex'):
if type(item) != type(()):
item = (item,)
item = item + (len(self.dimensions)-len(item))*(slice(None),)
return item
def _readData(self, item, part_first, part_last):
item = self._indices(item, part_first, part_last)
if item is None:
return None
else:
return self.file.file.variables[self.name][item]
def _writeData(self, item, data, part_first, part_last):
try:
if len(data) == 0:
return
except TypeError:
pass
item = self._indices(item, part_first, part_last)
if item is not None:
try:
self.file.file.variables[self.name][item] = N.array(data)
except:
print self.file.file.variables[self.name].shape
print item
print N.array(data).shape
raise
def _indices(self, item, part_first, part_last):
if hasattr(item, 'is_parindex'):
if not item.valid:
return None
if item.skip == 0:
return item.start+part_first
else:
return slice(item.start+part_first, item.stop+part_first,
item.skip)
if self.index is not None:
split = item[self.index]
if isinstance(split, int):
raise ValueError("Must use slice along split dimension")
first, last, skip = split.start, split.stop, split.step
if first is None: first = 0
if skip is None: skip = 1
n1 = max(0, (part_first-first+skip-1)/skip)
first = first + n1*skip
if last is None:
last = part_last
last = min(last, part_last)
item = item[:self.index] + (slice(first, last, skip),) + \
item[self.index+1:]
return item
def __getattr__(self, attr):
if self.file.local_access:
return getattr(self.file.file.variables[self.name], attr)
try:
return self.attributes[attr]
except KeyError:
pass
if self.pid == 0:
value = getattr(self.file.file.variables[self.name], attr)
else:
value = None
value = self.broadcast(value)
if self.file.read_only:
self.attributes[attr] = value
return value
def __len__(self):
return self.file.last - self.file.first
ParNetCDFVariable = ParClass(_ParNetCDFVariable)
ParNetCDFFile = ParClass(_ParNetCDFFile)