diff --git a/build/deploy.edex.awips2/esb/etc/ingestGrib.sh b/build/deploy.edex.awips2/esb/etc/ingestGrib.sh index fe483a45f5..145b9f0105 100644 --- a/build/deploy.edex.awips2/esb/etc/ingestGrib.sh +++ b/build/deploy.edex.awips2/esb/etc/ingestGrib.sh @@ -19,7 +19,7 @@ # further licensing information. ## export INIT_MEM=128 # in Meg -export MAX_MEM=768 # in Meg +export MAX_MEM=1024 # in Meg export METADATA_POOL_MAX=10 export EDEX_DEBUG_PORT=5007 diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/res/spring/grib-decode.xml b/edexOsgi/com.raytheon.edex.plugin.grib/res/spring/grib-decode.xml index 33abb46afd..142f82dd47 100644 --- a/edexOsgi/com.raytheon.edex.plugin.grib/res/spring/grib-decode.xml +++ b/edexOsgi/com.raytheon.edex.plugin.grib/res/spring/grib-decode.xml @@ -29,14 +29,15 @@ class="com.raytheon.edex.plugin.grib.spatial.GribSpatialCache" factory-method="getInstance" depends-on="gridcoveragelookup"/> - - - + + + + - + - + java.lang.Throwable @@ -86,5 +87,27 @@ + + + + + + + + + + + + + + ${body} + + + + + diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/resources/com.raytheon.edex.plugin.grib.properties b/edexOsgi/com.raytheon.edex.plugin.grib/resources/com.raytheon.edex.plugin.grib.properties index 8e07f6f3e5..c5e0a3893c 100644 --- a/edexOsgi/com.raytheon.edex.plugin.grib/resources/com.raytheon.edex.plugin.grib.properties +++ b/edexOsgi/com.raytheon.edex.plugin.grib/resources/com.raytheon.edex.plugin.grib.properties @@ -1,17 +1,17 @@ -# the number of grib decode threads. -grib-decode.count.threads=4 - # the number of grib split threads. grib-split.count.threads=2 -# the number of grib assembler threads. -grib-assembler.count.threads=2 +# the number of grib decode threads. +grib-decode.count.threads=4 + +# the number of grib persist threads. +grib-persister.count.threads=4 # Maximum number of grid points to decode at one time for all threads. Large # grib files may cause the decoder to reach this limit and then some threads # will have to wait. This can be used to control the amount of memory used by # the decoder. -grib-decode.count.gridpoints=6000000 +grib-decode.count.gridpoints=12000000 -# Maximum number of grids in MB that are allowed to be waiting for grid assembly. -grib-assembler.count.mb=100 +# Maximum number of grids in MB that are allowed to be in memory waiting to be persisted. +grib-persister.count.mb=200 diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribDecoder.java b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribDecoder.java index e5d093e491..432c4daa08 100644 --- a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribDecoder.java +++ b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribDecoder.java @@ -27,7 +27,9 @@ import org.apache.camel.Processor; import com.raytheon.edex.plugin.grib.exception.GribException; import com.raytheon.uf.common.dataplugin.grid.GridRecord; import com.raytheon.uf.common.status.IPerformanceStatusHandler; +import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.PerformanceStatus; +import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.time.util.ITimer; import com.raytheon.uf.common.time.util.TimeUtil; @@ -45,6 +47,7 @@ import com.raytheon.uf.common.time.util.TimeUtil; * added status to process. * Oct 07, 2013 2402 bsteffen Decode GribDecodeMessage instead of * files. + * Sep 14, 2015 4868 rjpeter Added logging of file being decoded. * * * @author njensen @@ -56,6 +59,9 @@ public class GribDecoder implements Processor { private final IPerformanceStatusHandler perfLog = PerformanceStatus .getHandler(""); + private final IUFStatusHandler statusHandler = UFStatus + .getHandler(GribDecoder.class); + /** * @see org.apache.camel.Processor.process(Exchange) */ @@ -66,45 +72,47 @@ public class GribDecoder implements Processor { .getBody(); byte gribEdition = inMessage.getGribEdition(); exchange.getIn().setHeader("dataType", "grib" + gribEdition); + statusHandler.info("Decoding file: " + inMessage.getFileName()); - ITimer timer = TimeUtil.getTimer(); - GridRecord[] records = null; - timer.start(); - switch (gribEdition) { - case 1: - records = new Grib1Decoder().decode(inMessage); - break; - case 2: - records = new Grib2Decoder().decode(inMessage); - break; - default: - throw new GribException("Unknown grib version detected [" + ITimer timer = TimeUtil.getTimer(); + GridRecord[] records = null; + timer.start(); + switch (gribEdition) { + case 1: + records = new Grib1Decoder().decode(inMessage); + break; + case 2: + records = new Grib2Decoder().decode(inMessage); + break; + default: + throw new GribException("Unknown grib version detected [" + gribEdition + "] in file: [" + inMessage.getFileName() + "]"); - } + } - String datasetId = (String) headers.get("datasetid"); - String secondaryId = (String) headers.get("secondaryid"); - String ensembleId = (String) headers.get("ensembleid"); + String datasetId = (String) headers.get("datasetid"); + String secondaryId = (String) headers.get("secondaryid"); + String ensembleId = (String) headers.get("ensembleid"); - if (secondaryId != null || datasetId != null || ensembleId != null) { - for (GridRecord record : records) { - if (datasetId != null) { - record.setDatasetId(datasetId); - } - if (secondaryId != null) { - record.setSecondaryId(secondaryId); - } - if (ensembleId != null) { - record.setEnsembleId(ensembleId); - } - record.setDataURI(null); + if ((secondaryId != null) || (datasetId != null) + || (ensembleId != null)) { + for (GridRecord record : records) { + if (datasetId != null) { + record.setDatasetId(datasetId); } + if (secondaryId != null) { + record.setSecondaryId(secondaryId); + } + if (ensembleId != null) { + record.setEnsembleId(ensembleId); + } + record.setDataURI(null); } - timer.stop(); - perfLog.logDuration("Grib" + gribEdition + ": Time to Decode", - timer.getElapsedTime()); - exchange.getIn().setBody(records); + } + timer.stop(); + perfLog.logDuration("Grib" + gribEdition + ": Time to Decode", + timer.getElapsedTime()); + exchange.getIn().setBody(records); } diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribGridPointLock.java b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribGridPointLock.java index 164a4ade26..bdb10e8098 100644 --- a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribGridPointLock.java +++ b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribGridPointLock.java @@ -38,7 +38,7 @@ import com.raytheon.uf.common.time.util.TimeUtil; /** * Object for tracking the number of grid points currently being processed by * the grib decode route. Grid points is used as an approximate measure of the - * amount of memory needed to decode the file, limiting the totla number of grid + * amount of memory needed to decode the file, limiting the total number of grid * points keeps memory usage more consistent. Before a file can be decoded the * message should pass through reserve to ensure the grib decoder has enough * free points. After decode the points should be released. @@ -50,7 +50,7 @@ import com.raytheon.uf.common.time.util.TimeUtil; * Date Ticket# Engineer Description * ------------- -------- ----------- -------------------------- * Oct 09, 2013 2402 bsteffen Initial creation - * + * Sep 14, 2015 4868 rjpeter Fix comment spelling. * * * @author bsteffen @@ -73,10 +73,9 @@ public class GribGridPointLock { private final AtomicLong currentPoints = new AtomicLong(); - private Queue waiters = new ConcurrentLinkedQueue(); + private final Queue waiters = new ConcurrentLinkedQueue(); - public GribGridPointLock(long maxConcurrentGridPoints, - int decodeThreadCount) { + public GribGridPointLock(long maxConcurrentGridPoints, int decodeThreadCount) { this.maxConcurrentGridPoints = maxConcurrentGridPoints; this.fairConcurrentGridPoints = maxConcurrentGridPoints / decodeThreadCount; @@ -138,7 +137,7 @@ public class GribGridPointLock { Object waiter = new Object(); synchronized (waiter) { waiters.offer(waiter); - while (waiters.peek() != waiter + while ((waiters.peek() != waiter) || !fastReserve(gridPoints, true)) { try { waiter.wait(15000); @@ -159,7 +158,7 @@ public class GribGridPointLock { long oldPoints = currentPoints.get(); long newPoints = oldPoints + gridPoints; while ((ignoreWaiters || waiters.isEmpty()) - && newPoints <= maxConcurrentGridPoints) { + && (newPoints <= maxConcurrentGridPoints)) { if (currentPoints.compareAndSet(oldPoints, newPoints)) { return true; } diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribPersister.java b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribPersister.java new file mode 100644 index 0000000000..9f7dd1dd51 --- /dev/null +++ b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/GribPersister.java @@ -0,0 +1,411 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.edex.plugin.grib; + +import java.io.File; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.camel.Headers; + +import com.raytheon.uf.common.dataplugin.PluginDataObject; +import com.raytheon.uf.common.dataplugin.grid.GridRecord; +import com.raytheon.uf.common.dataplugin.persist.IHDFFilePathProvider; +import com.raytheon.uf.common.status.IUFStatusHandler; +import com.raytheon.uf.common.status.UFStatus; +import com.raytheon.uf.common.util.SizeUtil; +import com.raytheon.uf.edex.core.EDEXUtil; +import com.raytheon.uf.edex.core.EdexException; +import com.raytheon.uf.edex.core.IContextStateProcessor; +import com.raytheon.uf.edex.database.plugin.PluginFactory; + +/** + * Class to persist grids asynchronously from decode. Grids queued by hdf5 file. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Sep 10, 2015 4868       rjpeter     Initial creation
+ * 
+ * 
+ * + * @author rjpeter + * @version 1.0 + */ + +public class GribPersister implements IContextStateProcessor { + private static final String HEADERS_ATTRIBUTE = "HEADERS"; + + private final IUFStatusHandler statusHandler = UFStatus + .getHandler(GribPersister.class); + + private final Map> gridsByFile = new LinkedHashMap<>(); + + private final Set inProcessFiles = new HashSet<>(); + + private volatile boolean running = true; + + public IHDFFilePathProvider pathProvider; + + private long maxBytesInMemory = 0; + + private long bytesInMemory = 0; + + private int gridsPending = 0; + + private int gridsInProcess = 0; + + private final GribPersistThread[] persistThreads; + + public GribPersister(String pluginName, String numThreads, + String maxGridsInMb) { + pathProvider = PluginFactory.getInstance().getPathProvider(pluginName); + int numPersistThreads = 0; + try { + numPersistThreads = Integer.parseInt(numThreads); + } catch (NumberFormatException e) { + // ignore + } + + if (numPersistThreads <= 0) { + numPersistThreads = 4; + statusHandler.warn("Invalid numThreads [" + numThreads + + "], using default of [" + numPersistThreads + "]"); + } + + int maxInMb = 0; + try { + maxInMb = Integer.parseInt(maxGridsInMb); + } catch (NumberFormatException e) { + // ignore + } + + if (maxInMb <= 0) { + maxInMb = 100; + statusHandler.warn("Invalid maxGridInMb [" + maxGridsInMb + + "], using default of [" + maxInMb + "]"); + } + + maxBytesInMemory = maxInMb * 1024l * 1024l; + + persistThreads = new GribPersistThread[numPersistThreads]; + for (int i = 0; i < persistThreads.length; i++) { + persistThreads[i] = new GribPersistThread(); + persistThreads[i].setName("GribPersist-" + (i + 1)); + persistThreads[i].start(); + } + } + + public void persist(@Headers Map headers, + GridRecord... records) { + boolean storeNow = true; + + if (records != null) { + storeNow = !addPendingRecords(headers, records); + } + + if (storeNow) { + try { + sendToEndpoint("persistIndex", records); + } catch (Exception e) { + statusHandler.error( + "Error occurred sending grids to persistIndex", e); + } + } + } + + /** + * Adds the records for storage by the GribPersistThreads. Return of true + * indicates grids will be stored, false otherwise. This is intended for + * shutdown scenario where the main threads need to store the final grids + * themselves. + * + * @param headers + * @param records + * @return True if records will be stored by the GribPersistThreads. + */ + private boolean addPendingRecords(@Headers Map headers, + GridRecord[] records) { + if (records != null) { + StringBuilder path = new StringBuilder(); + synchronized (gridsByFile) { + if (!running) { + return false; + } + + for (GridRecord record : records) { + String plugin = record.getPluginName(); + path.setLength(0); + path.append(pathProvider.getHDFPath(plugin, record)) + .append(File.separatorChar) + .append(pathProvider.getHDFFileName(plugin, record)); + String filePath = path.toString(); + List recs = gridsByFile.get(filePath); + + if (recs == null) { + recs = new LinkedList<>(); + gridsByFile.put(filePath, recs); + } + + recs.add(record); + + /* + * since grids will be bulk stored by file, track the + * original headers for purposes of logging and stats + */ + record.addExtraAttribute(HEADERS_ATTRIBUTE, headers); + + // update bytesInMemory + bytesInMemory += ((float[]) record.getMessageData()).length * 4; + } + + gridsPending += records.length; + + // wake up any sleeping persist threads + gridsByFile.notifyAll(); + boolean logMessage = true; + + while (bytesInMemory > maxBytesInMemory) { + if (logMessage) { + statusHandler.info("Max Grids in memory for " + + getClass().getName() + + " exceeded. Waiting for grids to process"); + logMessage = false; + } + + try { + gridsByFile.wait(); + } catch (InterruptedException e) { + // ignore + } + } + } + + return true; + } + + return false; + } + + private void sendToEndpoint(String endpoint, PluginDataObject... pdos) + throws EdexException { + EDEXUtil.getMessageProducer().sendSync(endpoint, pdos); + } + + /** + * Handle case of multiple quadrants stored in one transaction. + * + * @param pdos + * @return + */ + public PluginDataObject[] eliminateDuplicates(PluginDataObject... pdos) { + if ((pdos != null) && (pdos.length > 1)) { + // dup elim by dataURI + Map pdoMap = new HashMap<>(pdos.length, 1); + for (PluginDataObject pdo : pdos) { + pdoMap.put(pdo.getDataURI(), pdo); + } + + if (pdoMap.size() < pdos.length) { + pdos = pdoMap.values().toArray( + new PluginDataObject[pdoMap.size()]); + } + } + + return pdos; + } + + /** + * This is called by the splitter to get an individual log statement per + * data_store file. The hdf5 and index is done on a bulk basis and the one + * to one relationship with data_store to log entry needs to be restored for + * proper tracking and statistics. The original header object is kept in the + * extraAttributes of the GridRecord. This is done in addPendingRecords. + * + * @param record + * @param header + */ + public void updateLogHeader(GridRecord record, + @Headers Map header) { + @SuppressWarnings("unchecked") + Map recHeader = (Map) record + .getExtraAttributes().get(HEADERS_ATTRIBUTE); + String[] fieldsToCopy = new String[] { "dataType", "pluginName", + "ingestFileName", "dequeueTime", "enqueueTime" }; + for (String field : fieldsToCopy) { + Object val = recHeader.get(field); + if (val != null) { + header.put(field, val); + } + } + + } + + private class GribPersistThread extends Thread { + @Override + public void run() { + while (running) { + try { + String file = null; + List recordsToStore = null; + + synchronized (gridsByFile) { + while (running && gridsByFile.isEmpty()) { + try { + gridsByFile.wait(); + } catch (InterruptedException e) { + // ignore + } + } + + if (!gridsByFile.isEmpty()) { + Iterator iter = gridsByFile.keySet() + .iterator(); + while (iter.hasNext()) { + file = iter.next(); + if (!inProcessFiles.contains(file)) { + inProcessFiles.add(file); + recordsToStore = gridsByFile.get(file); + iter.remove(); + gridsPending -= recordsToStore.size(); + gridsInProcess += recordsToStore.size(); + break; + } + } + + if (recordsToStore == null) { + // all files currently storing on other threads + try { + gridsByFile.wait(); + } catch (InterruptedException e) { + // ignore + } + + continue; + } + } + } + + if (recordsToStore != null) { + long timeToStore = System.currentTimeMillis(); + try { + sendToEndpoint( + "gribPersistIndexAlert", + recordsToStore + .toArray(new PluginDataObject[recordsToStore + .size()])); + } catch (Exception e) { + /* + * TODO: Compile list of headers that this store + * affected. + */ + statusHandler.error( + "Error occurred persisting grids", e); + } + + timeToStore = System.currentTimeMillis() - timeToStore; + long bytesFree = 0; + for (GridRecord rec : recordsToStore) { + bytesFree += ((float[]) rec.getMessageData()).length * 4; + } + + int gridsLeft = 0; + int gridsStoringOnOtherThreads = 0; + long bytesUsedByGrids = 0; + + synchronized (gridsByFile) { + inProcessFiles.remove(file); + bytesInMemory -= bytesFree; + bytesUsedByGrids = bytesInMemory; + gridsInProcess -= recordsToStore.size(); + gridsStoringOnOtherThreads = gridsInProcess; + gridsLeft = gridsPending; + gridsByFile.notifyAll(); + } + + if (gridsLeft > 0) { + StringBuilder msg = new StringBuilder(80); + msg.append(gridsLeft) + .append((gridsLeft == 1 ? " grid " + : " grids ")) + .append("pending, ") + .append(gridsStoringOnOtherThreads) + .append((gridsStoringOnOtherThreads == 1 ? " grid " + : " grids ")) + .append("in process on other threads, ") + .append(SizeUtil + .prettyByteSize(bytesUsedByGrids)) + .append(" in memory."); + statusHandler.info(msg.toString()); + } + } + } catch (Throwable e) { + statusHandler.error( + "Unhandled error occurred persist grids", e); + } + } + } + } + + @Override + public void preStart() { + // NOOP + } + + @Override + public void postStart() { + // NOOP + } + + @Override + public void preStop() { + running = false; + + synchronized (gridsByFile) { + gridsByFile.notifyAll(); + if (gridsByFile.size() > 0) { + statusHandler.info("Waiting for " + gridsByFile.size() + + " hdf5 files to be persisted"); + } + } + + for (GribPersistThread thread : persistThreads) { + try { + thread.join(); + } catch (InterruptedException e) { + // ignore + } + } + } + + @Override + public void postStop() { + // NOOP + } +} diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java index 8b5205a7dc..030817a673 100644 --- a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java +++ b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java @@ -22,11 +22,7 @@ package com.raytheon.edex.plugin.grib.decoderpostprocessors; import java.io.File; import java.io.FilenameFilter; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -36,10 +32,7 @@ import com.raytheon.edex.plugin.grib.exception.GribException; import com.raytheon.edex.plugin.grib.spatial.GribSpatialCache; import com.raytheon.edex.util.Util; import com.raytheon.edex.util.grib.CompositeModel; -import com.raytheon.uf.common.dataplugin.PluginDataObject; -import com.raytheon.uf.common.dataplugin.grid.GridConstants; import com.raytheon.uf.common.dataplugin.grid.GridRecord; -import com.raytheon.uf.common.datastorage.records.FloatDataRecord; import com.raytheon.uf.common.gridcoverage.GridCoverage; import com.raytheon.uf.common.localization.IPathManager; import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel; @@ -50,19 +43,9 @@ import com.raytheon.uf.common.serialization.SingleTypeJAXBManager; import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; -import com.raytheon.uf.common.time.util.TimeUtil; -import com.raytheon.uf.common.util.CollectionUtil; import com.raytheon.uf.common.util.FileUtil; -import com.raytheon.uf.common.util.GridUtil; import com.raytheon.uf.common.util.file.FilenameFilters; -import com.raytheon.uf.edex.core.EDEXUtil; -import com.raytheon.uf.edex.core.IContextStateProcessor; -import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; -import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState; -import com.raytheon.uf.edex.database.cluster.ClusterTask; -import com.raytheon.uf.edex.database.plugin.DataURIDatabaseUtil; -import com.raytheon.uf.edex.database.plugin.PluginFactory; -import com.raytheon.uf.edex.plugin.grid.dao.GridDao; +import com.raytheon.uf.edex.plugin.grid.PartialGrid; /** * The EnsembleGridAssembler class is part of the ingest process for grib data. @@ -86,34 +69,19 @@ import com.raytheon.uf.edex.plugin.grid.dao.GridDao; * Apr 21, 2014 2060 njensen Remove dependency on grid dataURI column * Jul 21, 2014 3373 bclement JAXB manager api changes * Aug 18, 2014 4360 rferrel Set secondaryId in {@link #createAssembledRecord(GridRecord, CompositeModel)} - * Sep 09, 2015 4868 rjpeter Move grid assembly to a dedicated thread. + * Sep 09, 2015 4868 rjpeter Updated to be stored in partial grids as part of normal route. * * * @author bphillip * @version 1 */ -public class EnsembleGridAssembler implements IDecoderPostProcessor, - IContextStateProcessor { +public class EnsembleGridAssembler implements IDecoderPostProcessor { private static final transient IUFStatusHandler statusHandler = UFStatus .getHandler(EnsembleGridAssembler.class); /** The map of the models that come in sections */ private static final Map thinnedModels = new HashMap<>();; - private static final String CLUSTER_TASK_NAME = "EnsembleGrid"; - - private static final Map> pendingRecords = new LinkedHashMap<>(); - - private static long maxBytesInMemory = 0; - - private static long bytesInMemory = 0; - - private static GridAssembler[] assemblerThreads = null; - - private static int numAssemblerThreads = 1; - - private static volatile boolean running = true; - static { loadThinnedModels(); } @@ -155,50 +123,20 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor, } } - public void setMaxGridsInMB(int maxInMB) { - maxBytesInMemory = maxInMB * 1024 * 1024; - } - - public void setNumAssemblerThreads(int numThreads) { - if (numThreads > 0) { - numAssemblerThreads = numThreads; - } else { - statusHandler - .error("Number of assembler threads must be > 0, keeping previous value of " - + numAssemblerThreads); - } - } - @Override public GridRecord[] process(GridRecord rec) throws GribException { CompositeModel compositeModel = getCompositeModel(rec.getDatasetId()); - GridRecord assembledRecord = createAssembledRecord(rec, compositeModel); if (compositeModel != null) { - if (!addPendingRecord(rec, assembledRecord)) { - // in shutdown scenaro, store immeidately - String lockName = assembledRecord.getDataURI(); - ClusterTask ct = null; - try { - do { - ct = ClusterLockUtils.lock(CLUSTER_TASK_NAME, lockName, - 120000, true); - } while (!LockState.SUCCESSFUL.equals(ct.getLockState())); - - processGrids(assembledRecord, Arrays.asList(rec)); - } catch (Exception e) { - throw new GribException("Error processing assembled grid", - e); - } finally { - if (ct != null) { - ClusterLockUtils.deleteLock(ct.getId().getName(), ct - .getId().getDetails()); - } - } + GridRecord assembledRecord = createAssembledRecord(rec, + compositeModel); + GridRecord wrapRecord = setPartialGrid(compositeModel, + assembledRecord, rec); + if (wrapRecord == null) { + return new GridRecord[] { assembledRecord }; } - // grid was assembled in to a larger grid, discard original - return new GridRecord[0]; + return new GridRecord[] { assembledRecord, wrapRecord }; } // wasn't a grid to be assembled @@ -230,107 +168,11 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor, newRecord.setLocation(coverage); newRecord.setDatasetId(thinned.getModelName()); + newRecord.setOverwriteAllowed(true); return newRecord; } - /** - * Adds a record to the queue to be assembled by the grid assembler threads. - * Returns true if the grid was added to the queue, false if the add failed - * and the grid should be assembled by the caller. - * - * @param pendingRecord - * @param assembledRecord - * @return - */ - private boolean addPendingRecord(GridRecord pendingRecord, - GridRecord assembledRecord) { - GridCoverage coverage = pendingRecord.getLocation(); - long pointsInGrid = coverage.getNx() * coverage.getNy(); - - synchronized (pendingRecords) { - /* shutting down, record should be stored by calling thread */ - if (!running) { - return false; - } - - if (assemblerThreads == null) { - /* - * Start assembler threads if they haven't been started yet - */ - assemblerThreads = new GridAssembler[numAssemblerThreads]; - for (int i = 0; i < assemblerThreads.length; i++) { - assemblerThreads[i] = new GridAssembler("GridAssembler-" - + (i + 1)); - assemblerThreads[i].start(); - } - } - - boolean logMessage = true; - - bytesInMemory += pointsInGrid * 4; - List pendingList = pendingRecords.get(assembledRecord); - if (pendingList == null) { - pendingList = new ArrayList<>(4); - pendingRecords.put(assembledRecord, pendingList); - } - - pendingList.add(pendingRecord); - pendingRecords.notifyAll(); - - while (bytesInMemory > maxBytesInMemory) { - if (logMessage) { - statusHandler.info("Max Grids in " + getClass().getName() - + " exceeded. Waiting for grids to process"); - logMessage = false; - } - - try { - pendingRecords.wait(); - } catch (InterruptedException e) { - // ignore - } - } - } - - return true; - } - - /** - * Processes a single GridRecord - * - * @param record - * The GridRecord to process - * @param thinned - * The composite model for which the GridRecord is a part of - * @return The new grib record - * @throws Exception - */ - private void processGrids(GridRecord assembledRecord, - List recordsToAssemble) throws Exception { - boolean exists = DataURIDatabaseUtil.existingDataURI(assembledRecord); - GridDao dao = (GridDao) PluginFactory.getInstance().getPluginDao( - GridConstants.GRID); - - if (!exists) { - GridCoverage coverage = assembledRecord.getLocation(); - float[] data = new float[coverage.getNx() * coverage.getNy()]; - Arrays.fill(data, GridUtil.GRID_FILL_VALUE); - assembledRecord.setMessageData(data); - } else { - FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data( - assembledRecord, -1)[0]; - assembledRecord.setMessageData(rec.getFloatData()); - } - - mergeData(assembledRecord, recordsToAssemble); - assembledRecord.setOverwriteAllowed(true); - assembledRecord.setInsertTime(TimeUtil.newGmtCalendar()); - dao.persistRecords(assembledRecord); - EDEXUtil.getMessageProducer().sendSync("notificationAggregation", - new PluginDataObject[] { assembledRecord }); - } - /** * Merges the data from a GridRecord into the composite GridRecord * @@ -342,225 +184,99 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor, * The composite model definition * @throws GribException */ - private void mergeData(GridRecord assembledRecord, - List recordsToAssemble) throws GribException { - CompositeModel thinned = thinnedModels.get(assembledRecord - .getDatasetId()); - GridCoverage assembledCoverage = assembledRecord.getLocation(); - float[][] assembledData = Util.resizeDataTo2D( - (float[]) assembledRecord.getMessageData(), - assembledCoverage.getNx(), assembledCoverage.getNy()); + private GridRecord setPartialGrid(CompositeModel thinned, + GridRecord assembledRecord, GridRecord recordToAssemble) + throws GribException { + PartialGrid pGrid = new PartialGrid(); - for (GridRecord record : recordsToAssemble) { - String modelName = record.getDatasetId(); - GridCoverage coverage = record.getLocation(); + String modelName = recordToAssemble.getDatasetId(); + GridCoverage coverage = recordToAssemble.getLocation(); - int nx = coverage.getNx(); - int ny = coverage.getNy(); - - List compModels = thinned.getModelList(); - - int modIndex = compModels.indexOf(modelName); - if (modIndex == -1) { - /* - * Shouldn't be possible since was how it was found in the first - * place - */ - throw new GribException( - "Error assembling grids. Thinned grid definition does not contain " - + modelName); - } + int nx = coverage.getNx(); + int ny = coverage.getNy(); + pGrid.setNx(nx); + pGrid.setNy(ny); + /* + * TODO: This should map the UL corner of recordToAssemble to + * assembledRecord instead of relying on index in list + */ + List compModels = thinned.getModelList(); + int modIndex = compModels.indexOf(modelName); + if (modIndex == -1) { /* - * TODO: This should map the UL corner of record to assembledRecord - * instead of relying on index in list + * Shouldn't be possible since was how it was found in the first + * place */ - Util.insertSubgrid(assembledData, Util.resizeDataTo2D( - (float[]) record.getMessageData(), coverage.getNx(), - coverage.getNy()), (nx * modIndex) - modIndex, 0, nx, ny); + throw new GribException( + "Error assembling grids. Thinned grid definition does not contain " + + modelName); } - assembledRecord.setMessageData(Util.resizeDataTo1D(assembledData, - assembledCoverage.getNy(), assembledCoverage.getNx())); - } + pGrid.setxOffset((nx * modIndex) - modIndex); + pGrid.setyOffset(0); + assembledRecord.addExtraAttribute(PartialGrid.KEY, pGrid); + assembledRecord.setMessageData(recordToAssemble.getMessageData()); - @Override - public void preStart() { - // null op - } - - @Override - public void postStart() { - // null op - } - - @Override - public void preStop() { - running = false; - - if (assemblerThreads != null) { - - synchronized (pendingRecords) { - pendingRecords.notifyAll(); - if (pendingRecords.size() > 0) { - statusHandler.info("Waiting for " + pendingRecords.size() - + " grids to be assembled"); - } - } - - for (GridAssembler assembler : assemblerThreads) { - try { - assembler.join(); - } catch (InterruptedException e) { - // ignore - } - } - } - } - - /* - * (non-Javadoc) - * - * @see com.raytheon.uf.edex.core.IContextStateProcessor#postStop() - */ - @Override - public void postStop() { + return checkWorldWrap(assembledRecord); } /** - * Thread to assemble sectorized grids into the overall parent grid. + * Checks if assembledRecord's partial grid has extra columns that wrap + * around. This is due to partial grids having 1 column of overlap between + * each partial grid. + * + * @param assembledRecord + * @return */ - private class GridAssembler extends Thread { - public GridAssembler(String name) { - super(name); + private GridRecord checkWorldWrap(GridRecord assembledRecord) { + GridCoverage assembledCoverage = assembledRecord.getLocation(); + int assembledNx = assembledCoverage.getNx(); + PartialGrid pGrid = (PartialGrid) assembledRecord + .getExtraAttribute(PartialGrid.KEY); + int xOffset = pGrid.getxOffset(); + int nx = pGrid.getNx(); + int ny = pGrid.getNy(); + + // check world wrap due to overlapping columns + if ((xOffset + nx) > assembledNx) { + float[] messageData = (float[]) assembledRecord.getMessageData(); + float[][] data2D = Util.resizeDataTo2D(messageData, nx, ny); + + // cut off extra data from assembledRecord + int newNx = assembledNx - xOffset; + pGrid.setNx(newNx); + assembledRecord.setMessageData(trimGridAndMake1D(data2D, 0, 0, + newNx, ny)); + + // make a secondary record for the wrap amount + GridRecord wrappedRecord = new GridRecord(assembledRecord); + PartialGrid wrappedPartial = new PartialGrid(); + wrappedPartial.setxOffset(0); + wrappedPartial.setyOffset(0); + wrappedPartial.setNx(nx - newNx); + wrappedPartial.setNy(ny); + wrappedRecord.addExtraAttribute(PartialGrid.KEY, wrappedPartial); + wrappedRecord.setMessageData(trimGridAndMake1D(data2D, newNx, 0, + wrappedPartial.getNx(), ny)); + wrappedRecord.setOverwriteAllowed(true); + return wrappedRecord; } - @Override - public void run() { - boolean keepProcessing = running; - long timeToAssembleGrid = 0; - do { - GridRecord compositeRecord = null; - List recordsToAssemble = null; - ClusterTask ct = null; + return null; + } - try { - int index = 0; - do { - compositeRecord = getNextRecord(index); + private float[] trimGridAndMake1D(float[][] data, int xOffset, int yOffset, + int nx, int ny) { + float[][] rval = new float[ny][nx]; - /* - * check compositeRecord in case a shutdown was - * triggered - */ - if (compositeRecord != null) { - String lockName = compositeRecord.getDataURI(); - ct = ClusterLockUtils.lock(CLUSTER_TASK_NAME, - lockName, 120000, false); - - if (!LockState.SUCCESSFUL.equals(ct.getLockState())) { - index++; - continue; - } - - synchronized (pendingRecords) { - recordsToAssemble = pendingRecords - .remove(compositeRecord); - } - } - } while ((recordsToAssemble == null) && running); - - if (recordsToAssemble != null) { - long t0 = System.currentTimeMillis(); - processGrids(compositeRecord, recordsToAssemble); - timeToAssembleGrid = System.currentTimeMillis() - t0; - } - } catch (Throwable e) { - statusHandler.error( - "Uncaught exception while assembling grids", e); - } finally { - if (ct != null) { - /* - * lock is time based, need to delete lock instead of - * just unlocking - */ - ClusterLockUtils.deleteLock(ct.getId().getName(), ct - .getId().getDetails()); - } - - if (!CollectionUtil.isNullOrEmpty(recordsToAssemble)) { - long points = 0; - for (GridRecord rec : recordsToAssemble) { - GridCoverage location = rec.getLocation(); - points += location.getNx() * location.getNy(); - } - - int remaining = 0; - - synchronized (pendingRecords) { - bytesInMemory -= points * 4; - pendingRecords.notifyAll(); - remaining = pendingRecords.size(); - } - - int count = recordsToAssemble.size(); - StringBuilder msg = new StringBuilder(80); - msg.append("Took ").append(timeToAssembleGrid) - .append("ms to merge ").append(count) - .append((count == 1 ? " grid. " : " grids. ")) - .append(remaining) - .append((remaining == 1 ? " grid" : " grids")) - .append(" remaining to merge."); - statusHandler.info(msg.toString()); - } - } - - keepProcessing = running; - if (!keepProcessing) { - synchronized (pendingRecords) { - keepProcessing = !pendingRecords.isEmpty(); - } - } - } while (keepProcessing); - } - - /** - * Returns the next record to be assembled. Index allows for skipping of - * records in case the lock is being held by other threads/processes. - * - * @param index - * @return - */ - private GridRecord getNextRecord(int index) { - GridRecord rval = null; - - synchronized (pendingRecords) { - // avoid holding lock for extended period - while (pendingRecords.isEmpty() && running) { - try { - pendingRecords.wait(); - } catch (InterruptedException e) { - // ignore - } - } - - if (pendingRecords.size() > 0) { - Iterator iter = pendingRecords.keySet() - .iterator(); - index %= pendingRecords.size(); - - // skip previously checked entries - for (int i = 0; i < index; i++) { - iter.next(); - } - - rval = iter.next(); - } + for (int row = 0; row < ny; row++) { + for (int col = 0; col < nx; col++) { + rval[row][col] = data[yOffset + row][xOffset + col]; } - - return rval; } + return Util.resizeDataTo1D(rval, ny, nx); } } diff --git a/edexOsgi/com.raytheon.uf.edex.plugin.grid/META-INF/MANIFEST.MF b/edexOsgi/com.raytheon.uf.edex.plugin.grid/META-INF/MANIFEST.MF index 1fc6f5bcc9..d7547e1b1e 100644 --- a/edexOsgi/com.raytheon.uf.edex.plugin.grid/META-INF/MANIFEST.MF +++ b/edexOsgi/com.raytheon.uf.edex.plugin.grid/META-INF/MANIFEST.MF @@ -2,9 +2,9 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Grid Bundle-SymbolicName: com.raytheon.uf.edex.plugin.grid -Bundle-Version: 1.0.0.qualifier +Bundle-Version: 1.15.0.qualifier Bundle-Vendor: RAYTHEON -Bundle-RequiredExecutionEnvironment: JavaSE-1.6 +Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Import-Package: com.raytheon.uf.common.inventory.tree, com.raytheon.uf.common.dataplugin, com.raytheon.uf.common.dataplugin.persist, @@ -17,9 +17,9 @@ Import-Package: com.raytheon.uf.common.inventory.tree, com.raytheon.uf.edex.database.cluster, com.raytheon.uf.edex.database.dao, com.raytheon.uf.edex.database.plugin, - com.raytheon.uf.edex.database.query, - org.apache.commons.logging -Export-Package: com.raytheon.uf.edex.plugin.grid.dao, + com.raytheon.uf.edex.database.query +Export-Package: com.raytheon.uf.edex.plugin.grid, + com.raytheon.uf.edex.plugin.grid.dao, com.raytheon.uf.edex.plugin.grid.handler Require-Bundle: com.raytheon.uf.common.parameter;bundle-version="1.0.0", com.raytheon.uf.common.geospatial;bundle-version="1.12.1174", diff --git a/edexOsgi/com.raytheon.uf.edex.plugin.grid/src/com/raytheon/uf/edex/plugin/grid/PartialGrid.java b/edexOsgi/com.raytheon.uf.edex.plugin.grid/src/com/raytheon/uf/edex/plugin/grid/PartialGrid.java new file mode 100644 index 0000000000..a3b09c555a --- /dev/null +++ b/edexOsgi/com.raytheon.uf.edex.plugin.grid/src/com/raytheon/uf/edex/plugin/grid/PartialGrid.java @@ -0,0 +1,108 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.edex.plugin.grid; + +/** + * Designates that the messageData of the GridRecord is only a partial grid. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Sep 11, 2015 4868       rjpeter     Initial creation
+ * 
+ * 
+ * + * @author rjpeter + * @version 1.0 + */ +public class PartialGrid { + public static final String KEY = PartialGrid.class.getSimpleName(); + + private int nx; + + private int ny; + + private int xOffset; + + private int yOffset; + + /** + * @return the nx + */ + public int getNx() { + return nx; + } + + /** + * @param nx + * the nx to set + */ + public void setNx(int nx) { + this.nx = nx; + } + + /** + * @return the ny + */ + public int getNy() { + return ny; + } + + /** + * @param ny + * the ny to set + */ + public void setNy(int ny) { + this.ny = ny; + } + + /** + * @return the xOffset + */ + public int getxOffset() { + return xOffset; + } + + /** + * @param xOffset + * the xOffset to set + */ + public void setxOffset(int xOffset) { + this.xOffset = xOffset; + } + + /** + * @return the yOffset + */ + public int getyOffset() { + return yOffset; + } + + /** + * @param yOffset + * the yOffset to set + */ + public void setyOffset(int yOffset) { + this.yOffset = yOffset; + } +} diff --git a/edexOsgi/com.raytheon.uf.edex.plugin.grid/src/com/raytheon/uf/edex/plugin/grid/dao/GridDao.java b/edexOsgi/com.raytheon.uf.edex.plugin.grid/src/com/raytheon/uf/edex/plugin/grid/dao/GridDao.java index eb16b29325..c447f807ca 100644 --- a/edexOsgi/com.raytheon.uf.edex.plugin.grid/src/com/raytheon/uf/edex/plugin/grid/dao/GridDao.java +++ b/edexOsgi/com.raytheon.uf.edex.plugin.grid/src/com/raytheon/uf/edex/plugin/grid/dao/GridDao.java @@ -55,11 +55,13 @@ import com.raytheon.uf.common.gridcoverage.lookup.GridCoverageLookup; import com.raytheon.uf.common.parameter.Parameter; import com.raytheon.uf.common.parameter.lookup.ParameterLookup; import com.raytheon.uf.common.status.UFStatus.Priority; +import com.raytheon.uf.common.util.GridUtil; import com.raytheon.uf.edex.core.EDEXUtil; import com.raytheon.uf.edex.core.EdexException; import com.raytheon.uf.edex.core.dataplugin.PluginRegistry; import com.raytheon.uf.edex.database.DataAccessLayerException; import com.raytheon.uf.edex.database.plugin.PluginDao; +import com.raytheon.uf.edex.plugin.grid.PartialGrid; /** * Data access object for accessing Grid records from the database @@ -112,8 +114,23 @@ public class GridDao extends PluginDao { group = "/" + location.getId(); datasetName = abbrev; } - AbstractStorageRecord storageRecord = new FloatDataRecord( - datasetName, group, (float[]) messageData, 2, sizes); + AbstractStorageRecord storageRecord = null; + Object partialGrid = gridRec.getExtraAttribute(PartialGrid.KEY); + if ((partialGrid != null) && (partialGrid instanceof PartialGrid)) { + /* Check if dataset needs to be created */ + PartialGrid pGrid = (PartialGrid) partialGrid; + long[] pGridSize = new long[] { pGrid.getNx(), pGrid.getNy() }; + long[] pGridOffset = new long[] { pGrid.getxOffset(), + pGrid.getyOffset() }; + storageRecord = new FloatDataRecord(datasetName, group, + (float[]) messageData, 2, pGridSize); + storageRecord.setMinIndex(pGridOffset); + storageRecord.setMaxSizes(sizes); + storageRecord.setFillValue(GridUtil.GRID_FILL_VALUE); + } else { + storageRecord = new FloatDataRecord(datasetName, group, + (float[]) messageData, 2, sizes); + } storageRecord.setCorrelationObject(gridRec); StorageProperties sp = new StorageProperties(); diff --git a/pythonPackages/pypies/pypies/impl/H5pyDataStore.py b/pythonPackages/pypies/pypies/impl/H5pyDataStore.py index cc6ef7272d..e25f10912a 100644 --- a/pythonPackages/pypies/pypies/impl/H5pyDataStore.py +++ b/pythonPackages/pypies/pypies/impl/H5pyDataStore.py @@ -43,6 +43,8 @@ # Jul 27, 2015 4402 njensen Set fill_time_never on write if fill value is None # Jul 30, 2015 1574 nabowle Add deleteOrphanFiles() # Aug 20, 2015 DR 17726 mgamazaychikov Remove __doMakeReadable method +# Sep 14, 2015 4868 rjpeter Updated writePartialHDFData to create the dataset if +# it doesn't exist. # import h5py, os, numpy, pypies, re, logging, shutil, time, types, traceback @@ -140,8 +142,9 @@ class H5pyDataStore(IDataStore.IDataStore): rootNode=f['/'] group = self.__getNode(rootNode, record.getGroup(), None, create=True) if record.getMinIndex() is not None and len(record.getMinIndex()): - ss = self.__writePartialHDFDataset(f, data, record.getDimension(), record.getSizes(), - group[record.getName()], props, record.getMinIndex()) + ss = self.__writePartialHDFDataset(f, data, record.getDimension(), record.getSizes(), record.getName(), + group, props, self.__getHdf5Datatype(record), record.getMinIndex(), + record.getMaxSizes(), record.getFillValue()) else: ss = self.__writeHDFDataset(f, data, record.getDimension(), record.getSizes(), record.getName(), group, props, self.__getHdf5Datatype(record), storeOp, record) @@ -167,7 +170,7 @@ class H5pyDataStore(IDataStore.IDataStore): data = data.reshape(szDims1) ss = {} - if dataset in group.keys(): + if dataset in group: ds = group[dataset] if storeOp == 'STORE_ONLY': raise StorageException('Dataset ' + str(dataset) + ' already exists in group ' + str(group)) @@ -217,6 +220,12 @@ class H5pyDataStore(IDataStore.IDataStore): #dtype.set_strpad(h5t.STR_NULLTERM) return dtype + # Common use case of arrays are passed in x/y and orientation of data is y/x + def __reverseDimensions(self, dims): + revDims = [None, ] * len(dims) + for i in range(len(dims)): + revDims[i] = dims[len(dims) - i - 1] + return revDims def __calculateChunk(self, nDims, dataType, storeOp, maxDims): if nDims == 1: @@ -253,6 +262,12 @@ class H5pyDataStore(IDataStore.IDataStore): else: raise NotImplementedException("Storage of " + str(nDims) + " dimensional " + \ "data with mode " + storeOp + " not supported yet") + + # ensure chunk is not bigger than dimensions + if maxDims is not None: + for i in range(nDims): + chunk[i] = chunk[i] if maxDims[i] is None else min(chunk[i], maxDims[i]) + chunk = tuple(chunk) return chunk @@ -262,28 +277,38 @@ class H5pyDataStore(IDataStore.IDataStore): for key in attrs: dataset.attrs[key] = attrs[key] - def __writePartialHDFDataset(self, f, data, dims, szDims, ds, props, - minIndex): - # reverse sizes for hdf5 - szDims1 = [None, ] * len(szDims) - for i in range(len(szDims)): - szDims1[i] = szDims[len(szDims) - i - 1] - offset = [None, ] * len(minIndex) - for i in range(len(minIndex)): - offset[i] = minIndex[len(minIndex) - i - 1] + def __writePartialHDFDataset(self, f, data, dims, szDims, dataset, group, props, dataType, + minIndex, maxSizes, fillValue): + # Change dimensions to be Y/X + szDims1 = self.__reverseDimensions(szDims) + offset = self.__reverseDimensions(minIndex) - # process chunking -# chunkSize = None -# if data.dtype != numpy._string and data.dtype != numpy._object: -# chunkSize = DEFAULT_CHUNK_SIZE -# else: -# chunkSize = 1 -# chunk = [chunkSize] * len(szDims) data = data.reshape(szDims1) + ss = {} + if dataset in group: + ds=group[dataset] + ss['op'] = 'REPLACE' + else: + if maxSizes is None: + raise StorageException('Dataset ' + dataset + ' does not exist for partial write. MaxSizes not specified to create initial dataset') + + maxDims = self.__reverseDimensions(maxSizes) + nDims = len(maxDims) + chunk = self.__calculateChunk(nDims, dataType, 'STORE_ONLY', maxDims) + compression = None + if props: + compression = props.getCompression() + ds = self.__createDatasetInternal(group, dataset, dataType, maxDims, None, chunk, compression, fillValue) + ss['op'] = 'STORE_ONLY' + + if ds.shape[0] < data.shape[0] or ds.shape[1] < data.shape[1]: + raise StorageException('Partial write larger than original dataset. Original shape [' + str(ds.shape) + '], partial ') + endIndex = [offset[0] + szDims1[0], offset[1] + szDims1[1]] ds[offset[0]:endIndex[0], offset[1]:endIndex[1]] = data - return {'op':'REPLACE'} + + return ss def delete(self, request): @@ -349,7 +374,7 @@ class H5pyDataStore(IDataStore.IDataStore): # recursively looks for data sets def __hasDataSet(self, group): - for key in group.keys(): + for key in group: child=group[key] if type(child) == h5py.highlevel.Dataset: return True @@ -509,9 +534,7 @@ class H5pyDataStore(IDataStore.IDataStore): # reverse sizes for hdf5 szDims = rec.getSizes() - szDims1 = [None, ] * len(szDims) - for i in range(len(szDims)): - szDims1[i] = szDims[len(szDims) - i - 1] + szDims1 = self.__reverseDimensions(szDims) szDims = tuple(szDims1) chunks = None