Omaha #4868 - Add grib persist threads, make grid stiching inline

Change-Id: I5f96fa34dc49034d89bc018ad20b1901ed74b021

Former-commit-id: afaaf5730eb1c0a7f089d07d077ddf0fc8f7617c
This commit is contained in:
Richard Peter 2015-09-14 16:38:30 -05:00
parent fbdce3bede
commit f985b6882a
11 changed files with 758 additions and 453 deletions

View file

@ -19,7 +19,7 @@
# further licensing information.
##
export INIT_MEM=128 # in Meg
export MAX_MEM=768 # in Meg
export MAX_MEM=1024 # in Meg
export METADATA_POOL_MAX=10
export EDEX_DEBUG_PORT=5007

View file

@ -29,14 +29,15 @@
class="com.raytheon.edex.plugin.grib.spatial.GribSpatialCache"
factory-method="getInstance" depends-on="gridcoveragelookup"/>
<bean id="gridAssembler" class="com.raytheon.edex.plugin.grib.decoderpostprocessors.EnsembleGridAssembler">
<property name="numAssemblerThreads" value="${grib-assembler.count.threads}"/>
<property name="maxGridsInMB" value="${grib-assembler.count.mb}"/>
<bean id="gribPersister" class="com.raytheon.edex.plugin.grib.GribPersister">
<constructor-arg value="grid"/>
<constructor-arg value="${grib-persister.count.threads}"/>
<constructor-arg value="${grib-persister.count.mb}"/>
</bean>
<bean factory-bean="contextManager" factory-method="registerContextStateProcessor">
<constructor-arg ref="grib-decode"/>
<constructor-arg ref="gridAssembler"/>
<constructor-arg ref="gribPersister"/>
</bean>
<camelContext id="grib-decode" xmlns="http://camel.apache.org/schema/spring"
@ -75,7 +76,7 @@
<!-- send for processing -->
<bean ref="gribPostProcessor" method="process" />
<to uri="direct-vm:persistIndexAlert" />
<bean ref="gribPersister" method="persist"/>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
@ -86,5 +87,27 @@
</doFinally>
</doTry>
</route>
<!--
Copy of persist route with a callback to GribPersister for logging.
-->
<route id="gribPersistIndexAlert">
<from uri="direct-vm:gribPersistIndexAlert"/>
<bean ref="persist" method="persist"/>
<multicast>
<pipeline>
<!-- Separate index and logging as index needs to
elimnate duplicates for the case of stitched grids -->
<bean ref="gribPersister" method="eliminateDuplicates"/>
<bean ref="index" method="index"/>
<to uri="direct-vm:stageNotification"/>
</pipeline>
<split>
<simple>${body}</simple>
<bean ref="gribPersister" method="updateLogHeader"/>
<bean ref="processUtil" method="log"/>
</split>
</multicast>
</route>
</camelContext>
</beans>

View file

@ -1,17 +1,17 @@
# the number of grib decode threads.
grib-decode.count.threads=4
# the number of grib split threads.
grib-split.count.threads=2
# the number of grib assembler threads.
grib-assembler.count.threads=2
# the number of grib decode threads.
grib-decode.count.threads=4
# the number of grib persist threads.
grib-persister.count.threads=4
# Maximum number of grid points to decode at one time for all threads. Large
# grib files may cause the decoder to reach this limit and then some threads
# will have to wait. This can be used to control the amount of memory used by
# the decoder.
grib-decode.count.gridpoints=6000000
grib-decode.count.gridpoints=12000000
# Maximum number of grids in MB that are allowed to be waiting for grid assembly.
grib-assembler.count.mb=100
# Maximum number of grids in MB that are allowed to be in memory waiting to be persisted.
grib-persister.count.mb=200

View file

@ -27,7 +27,9 @@ import org.apache.camel.Processor;
import com.raytheon.edex.plugin.grib.exception.GribException;
import com.raytheon.uf.common.dataplugin.grid.GridRecord;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.PerformanceStatus;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
@ -45,6 +47,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
* added status to process.
* Oct 07, 2013 2402 bsteffen Decode GribDecodeMessage instead of
* files.
* Sep 14, 2015 4868 rjpeter Added logging of file being decoded.
* </pre>
*
* @author njensen
@ -56,6 +59,9 @@ public class GribDecoder implements Processor {
private final IPerformanceStatusHandler perfLog = PerformanceStatus
.getHandler("");
private final IUFStatusHandler statusHandler = UFStatus
.getHandler(GribDecoder.class);
/**
* @see org.apache.camel.Processor.process(Exchange)
*/
@ -66,45 +72,47 @@ public class GribDecoder implements Processor {
.getBody();
byte gribEdition = inMessage.getGribEdition();
exchange.getIn().setHeader("dataType", "grib" + gribEdition);
statusHandler.info("Decoding file: " + inMessage.getFileName());
ITimer timer = TimeUtil.getTimer();
GridRecord[] records = null;
timer.start();
switch (gribEdition) {
case 1:
records = new Grib1Decoder().decode(inMessage);
break;
case 2:
records = new Grib2Decoder().decode(inMessage);
break;
default:
throw new GribException("Unknown grib version detected ["
ITimer timer = TimeUtil.getTimer();
GridRecord[] records = null;
timer.start();
switch (gribEdition) {
case 1:
records = new Grib1Decoder().decode(inMessage);
break;
case 2:
records = new Grib2Decoder().decode(inMessage);
break;
default:
throw new GribException("Unknown grib version detected ["
+ gribEdition + "] in file: [" + inMessage.getFileName()
+ "]");
}
}
String datasetId = (String) headers.get("datasetid");
String secondaryId = (String) headers.get("secondaryid");
String ensembleId = (String) headers.get("ensembleid");
String datasetId = (String) headers.get("datasetid");
String secondaryId = (String) headers.get("secondaryid");
String ensembleId = (String) headers.get("ensembleid");
if (secondaryId != null || datasetId != null || ensembleId != null) {
for (GridRecord record : records) {
if (datasetId != null) {
record.setDatasetId(datasetId);
}
if (secondaryId != null) {
record.setSecondaryId(secondaryId);
}
if (ensembleId != null) {
record.setEnsembleId(ensembleId);
}
record.setDataURI(null);
if ((secondaryId != null) || (datasetId != null)
|| (ensembleId != null)) {
for (GridRecord record : records) {
if (datasetId != null) {
record.setDatasetId(datasetId);
}
if (secondaryId != null) {
record.setSecondaryId(secondaryId);
}
if (ensembleId != null) {
record.setEnsembleId(ensembleId);
}
record.setDataURI(null);
}
timer.stop();
perfLog.logDuration("Grib" + gribEdition + ": Time to Decode",
timer.getElapsedTime());
exchange.getIn().setBody(records);
}
timer.stop();
perfLog.logDuration("Grib" + gribEdition + ": Time to Decode",
timer.getElapsedTime());
exchange.getIn().setBody(records);
}

View file

@ -38,7 +38,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
/**
* Object for tracking the number of grid points currently being processed by
* the grib decode route. Grid points is used as an approximate measure of the
* amount of memory needed to decode the file, limiting the totla number of grid
* amount of memory needed to decode the file, limiting the total number of grid
* points keeps memory usage more consistent. Before a file can be decoded the
* message should pass through reserve to ensure the grib decoder has enough
* free points. After decode the points should be released.
@ -50,7 +50,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Oct 09, 2013 2402 bsteffen Initial creation
*
* Sep 14, 2015 4868 rjpeter Fix comment spelling.
* </pre>
*
* @author bsteffen
@ -73,10 +73,9 @@ public class GribGridPointLock {
private final AtomicLong currentPoints = new AtomicLong();
private Queue<Object> waiters = new ConcurrentLinkedQueue<Object>();
private final Queue<Object> waiters = new ConcurrentLinkedQueue<Object>();
public GribGridPointLock(long maxConcurrentGridPoints,
int decodeThreadCount) {
public GribGridPointLock(long maxConcurrentGridPoints, int decodeThreadCount) {
this.maxConcurrentGridPoints = maxConcurrentGridPoints;
this.fairConcurrentGridPoints = maxConcurrentGridPoints
/ decodeThreadCount;
@ -138,7 +137,7 @@ public class GribGridPointLock {
Object waiter = new Object();
synchronized (waiter) {
waiters.offer(waiter);
while (waiters.peek() != waiter
while ((waiters.peek() != waiter)
|| !fastReserve(gridPoints, true)) {
try {
waiter.wait(15000);
@ -159,7 +158,7 @@ public class GribGridPointLock {
long oldPoints = currentPoints.get();
long newPoints = oldPoints + gridPoints;
while ((ignoreWaiters || waiters.isEmpty())
&& newPoints <= maxConcurrentGridPoints) {
&& (newPoints <= maxConcurrentGridPoints)) {
if (currentPoints.compareAndSet(oldPoints, newPoints)) {
return true;
}

View file

@ -0,0 +1,411 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.grib;
import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.camel.Headers;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.grid.GridRecord;
import com.raytheon.uf.common.dataplugin.persist.IHDFFilePathProvider;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.util.SizeUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.core.IContextStateProcessor;
import com.raytheon.uf.edex.database.plugin.PluginFactory;
/**
* Class to persist grids asynchronously from decode. Grids queued by hdf5 file.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 10, 2015 4868 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class GribPersister implements IContextStateProcessor {
private static final String HEADERS_ATTRIBUTE = "HEADERS";
private final IUFStatusHandler statusHandler = UFStatus
.getHandler(GribPersister.class);
private final Map<String, List<GridRecord>> gridsByFile = new LinkedHashMap<>();
private final Set<String> inProcessFiles = new HashSet<>();
private volatile boolean running = true;
public IHDFFilePathProvider pathProvider;
private long maxBytesInMemory = 0;
private long bytesInMemory = 0;
private int gridsPending = 0;
private int gridsInProcess = 0;
private final GribPersistThread[] persistThreads;
public GribPersister(String pluginName, String numThreads,
String maxGridsInMb) {
pathProvider = PluginFactory.getInstance().getPathProvider(pluginName);
int numPersistThreads = 0;
try {
numPersistThreads = Integer.parseInt(numThreads);
} catch (NumberFormatException e) {
// ignore
}
if (numPersistThreads <= 0) {
numPersistThreads = 4;
statusHandler.warn("Invalid numThreads [" + numThreads
+ "], using default of [" + numPersistThreads + "]");
}
int maxInMb = 0;
try {
maxInMb = Integer.parseInt(maxGridsInMb);
} catch (NumberFormatException e) {
// ignore
}
if (maxInMb <= 0) {
maxInMb = 100;
statusHandler.warn("Invalid maxGridInMb [" + maxGridsInMb
+ "], using default of [" + maxInMb + "]");
}
maxBytesInMemory = maxInMb * 1024l * 1024l;
persistThreads = new GribPersistThread[numPersistThreads];
for (int i = 0; i < persistThreads.length; i++) {
persistThreads[i] = new GribPersistThread();
persistThreads[i].setName("GribPersist-" + (i + 1));
persistThreads[i].start();
}
}
public void persist(@Headers Map<String, Object> headers,
GridRecord... records) {
boolean storeNow = true;
if (records != null) {
storeNow = !addPendingRecords(headers, records);
}
if (storeNow) {
try {
sendToEndpoint("persistIndex", records);
} catch (Exception e) {
statusHandler.error(
"Error occurred sending grids to persistIndex", e);
}
}
}
/**
* Adds the records for storage by the GribPersistThreads. Return of true
* indicates grids will be stored, false otherwise. This is intended for
* shutdown scenario where the main threads need to store the final grids
* themselves.
*
* @param headers
* @param records
* @return True if records will be stored by the GribPersistThreads.
*/
private boolean addPendingRecords(@Headers Map<String, Object> headers,
GridRecord[] records) {
if (records != null) {
StringBuilder path = new StringBuilder();
synchronized (gridsByFile) {
if (!running) {
return false;
}
for (GridRecord record : records) {
String plugin = record.getPluginName();
path.setLength(0);
path.append(pathProvider.getHDFPath(plugin, record))
.append(File.separatorChar)
.append(pathProvider.getHDFFileName(plugin, record));
String filePath = path.toString();
List<GridRecord> recs = gridsByFile.get(filePath);
if (recs == null) {
recs = new LinkedList<>();
gridsByFile.put(filePath, recs);
}
recs.add(record);
/*
* since grids will be bulk stored by file, track the
* original headers for purposes of logging and stats
*/
record.addExtraAttribute(HEADERS_ATTRIBUTE, headers);
// update bytesInMemory
bytesInMemory += ((float[]) record.getMessageData()).length * 4;
}
gridsPending += records.length;
// wake up any sleeping persist threads
gridsByFile.notifyAll();
boolean logMessage = true;
while (bytesInMemory > maxBytesInMemory) {
if (logMessage) {
statusHandler.info("Max Grids in memory for "
+ getClass().getName()
+ " exceeded. Waiting for grids to process");
logMessage = false;
}
try {
gridsByFile.wait();
} catch (InterruptedException e) {
// ignore
}
}
}
return true;
}
return false;
}
private void sendToEndpoint(String endpoint, PluginDataObject... pdos)
throws EdexException {
EDEXUtil.getMessageProducer().sendSync(endpoint, pdos);
}
/**
* Handle case of multiple quadrants stored in one transaction.
*
* @param pdos
* @return
*/
public PluginDataObject[] eliminateDuplicates(PluginDataObject... pdos) {
if ((pdos != null) && (pdos.length > 1)) {
// dup elim by dataURI
Map<String, PluginDataObject> pdoMap = new HashMap<>(pdos.length, 1);
for (PluginDataObject pdo : pdos) {
pdoMap.put(pdo.getDataURI(), pdo);
}
if (pdoMap.size() < pdos.length) {
pdos = pdoMap.values().toArray(
new PluginDataObject[pdoMap.size()]);
}
}
return pdos;
}
/**
* This is called by the splitter to get an individual log statement per
* data_store file. The hdf5 and index is done on a bulk basis and the one
* to one relationship with data_store to log entry needs to be restored for
* proper tracking and statistics. The original header object is kept in the
* extraAttributes of the GridRecord. This is done in addPendingRecords.
*
* @param record
* @param header
*/
public void updateLogHeader(GridRecord record,
@Headers Map<String, Object> header) {
@SuppressWarnings("unchecked")
Map<String, Object> recHeader = (Map<String, Object>) record
.getExtraAttributes().get(HEADERS_ATTRIBUTE);
String[] fieldsToCopy = new String[] { "dataType", "pluginName",
"ingestFileName", "dequeueTime", "enqueueTime" };
for (String field : fieldsToCopy) {
Object val = recHeader.get(field);
if (val != null) {
header.put(field, val);
}
}
}
private class GribPersistThread extends Thread {
@Override
public void run() {
while (running) {
try {
String file = null;
List<GridRecord> recordsToStore = null;
synchronized (gridsByFile) {
while (running && gridsByFile.isEmpty()) {
try {
gridsByFile.wait();
} catch (InterruptedException e) {
// ignore
}
}
if (!gridsByFile.isEmpty()) {
Iterator<String> iter = gridsByFile.keySet()
.iterator();
while (iter.hasNext()) {
file = iter.next();
if (!inProcessFiles.contains(file)) {
inProcessFiles.add(file);
recordsToStore = gridsByFile.get(file);
iter.remove();
gridsPending -= recordsToStore.size();
gridsInProcess += recordsToStore.size();
break;
}
}
if (recordsToStore == null) {
// all files currently storing on other threads
try {
gridsByFile.wait();
} catch (InterruptedException e) {
// ignore
}
continue;
}
}
}
if (recordsToStore != null) {
long timeToStore = System.currentTimeMillis();
try {
sendToEndpoint(
"gribPersistIndexAlert",
recordsToStore
.toArray(new PluginDataObject[recordsToStore
.size()]));
} catch (Exception e) {
/*
* TODO: Compile list of headers that this store
* affected.
*/
statusHandler.error(
"Error occurred persisting grids", e);
}
timeToStore = System.currentTimeMillis() - timeToStore;
long bytesFree = 0;
for (GridRecord rec : recordsToStore) {
bytesFree += ((float[]) rec.getMessageData()).length * 4;
}
int gridsLeft = 0;
int gridsStoringOnOtherThreads = 0;
long bytesUsedByGrids = 0;
synchronized (gridsByFile) {
inProcessFiles.remove(file);
bytesInMemory -= bytesFree;
bytesUsedByGrids = bytesInMemory;
gridsInProcess -= recordsToStore.size();
gridsStoringOnOtherThreads = gridsInProcess;
gridsLeft = gridsPending;
gridsByFile.notifyAll();
}
if (gridsLeft > 0) {
StringBuilder msg = new StringBuilder(80);
msg.append(gridsLeft)
.append((gridsLeft == 1 ? " grid "
: " grids "))
.append("pending, ")
.append(gridsStoringOnOtherThreads)
.append((gridsStoringOnOtherThreads == 1 ? " grid "
: " grids "))
.append("in process on other threads, ")
.append(SizeUtil
.prettyByteSize(bytesUsedByGrids))
.append(" in memory.");
statusHandler.info(msg.toString());
}
}
} catch (Throwable e) {
statusHandler.error(
"Unhandled error occurred persist grids", e);
}
}
}
}
@Override
public void preStart() {
// NOOP
}
@Override
public void postStart() {
// NOOP
}
@Override
public void preStop() {
running = false;
synchronized (gridsByFile) {
gridsByFile.notifyAll();
if (gridsByFile.size() > 0) {
statusHandler.info("Waiting for " + gridsByFile.size()
+ " hdf5 files to be persisted");
}
}
for (GribPersistThread thread : persistThreads) {
try {
thread.join();
} catch (InterruptedException e) {
// ignore
}
}
}
@Override
public void postStop() {
// NOOP
}
}

View file

@ -22,11 +22,7 @@ package com.raytheon.edex.plugin.grib.decoderpostprocessors;
import java.io.File;
import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -36,10 +32,7 @@ import com.raytheon.edex.plugin.grib.exception.GribException;
import com.raytheon.edex.plugin.grib.spatial.GribSpatialCache;
import com.raytheon.edex.util.Util;
import com.raytheon.edex.util.grib.CompositeModel;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.grid.GridConstants;
import com.raytheon.uf.common.dataplugin.grid.GridRecord;
import com.raytheon.uf.common.datastorage.records.FloatDataRecord;
import com.raytheon.uf.common.gridcoverage.GridCoverage;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
@ -50,19 +43,9 @@ import com.raytheon.uf.common.serialization.SingleTypeJAXBManager;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.common.util.GridUtil;
import com.raytheon.uf.common.util.file.FilenameFilters;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.IContextStateProcessor;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
import com.raytheon.uf.edex.database.plugin.DataURIDatabaseUtil;
import com.raytheon.uf.edex.database.plugin.PluginFactory;
import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
import com.raytheon.uf.edex.plugin.grid.PartialGrid;
/**
* The EnsembleGridAssembler class is part of the ingest process for grib data.
@ -86,34 +69,19 @@ import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
* Apr 21, 2014 2060 njensen Remove dependency on grid dataURI column
* Jul 21, 2014 3373 bclement JAXB manager api changes
* Aug 18, 2014 4360 rferrel Set secondaryId in {@link #createAssembledRecord(GridRecord, CompositeModel)}
* Sep 09, 2015 4868 rjpeter Move grid assembly to a dedicated thread.
* Sep 09, 2015 4868 rjpeter Updated to be stored in partial grids as part of normal route.
* </pre>
*
* @author bphillip
* @version 1
*/
public class EnsembleGridAssembler implements IDecoderPostProcessor,
IContextStateProcessor {
public class EnsembleGridAssembler implements IDecoderPostProcessor {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(EnsembleGridAssembler.class);
/** The map of the models that come in sections */
private static final Map<String, CompositeModel> thinnedModels = new HashMap<>();;
private static final String CLUSTER_TASK_NAME = "EnsembleGrid";
private static final Map<GridRecord, List<GridRecord>> pendingRecords = new LinkedHashMap<>();
private static long maxBytesInMemory = 0;
private static long bytesInMemory = 0;
private static GridAssembler[] assemblerThreads = null;
private static int numAssemblerThreads = 1;
private static volatile boolean running = true;
static {
loadThinnedModels();
}
@ -155,50 +123,20 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor,
}
}
public void setMaxGridsInMB(int maxInMB) {
maxBytesInMemory = maxInMB * 1024 * 1024;
}
public void setNumAssemblerThreads(int numThreads) {
if (numThreads > 0) {
numAssemblerThreads = numThreads;
} else {
statusHandler
.error("Number of assembler threads must be > 0, keeping previous value of "
+ numAssemblerThreads);
}
}
@Override
public GridRecord[] process(GridRecord rec) throws GribException {
CompositeModel compositeModel = getCompositeModel(rec.getDatasetId());
GridRecord assembledRecord = createAssembledRecord(rec, compositeModel);
if (compositeModel != null) {
if (!addPendingRecord(rec, assembledRecord)) {
// in shutdown scenaro, store immeidately
String lockName = assembledRecord.getDataURI();
ClusterTask ct = null;
try {
do {
ct = ClusterLockUtils.lock(CLUSTER_TASK_NAME, lockName,
120000, true);
} while (!LockState.SUCCESSFUL.equals(ct.getLockState()));
processGrids(assembledRecord, Arrays.asList(rec));
} catch (Exception e) {
throw new GribException("Error processing assembled grid",
e);
} finally {
if (ct != null) {
ClusterLockUtils.deleteLock(ct.getId().getName(), ct
.getId().getDetails());
}
}
GridRecord assembledRecord = createAssembledRecord(rec,
compositeModel);
GridRecord wrapRecord = setPartialGrid(compositeModel,
assembledRecord, rec);
if (wrapRecord == null) {
return new GridRecord[] { assembledRecord };
}
// grid was assembled in to a larger grid, discard original
return new GridRecord[0];
return new GridRecord[] { assembledRecord, wrapRecord };
}
// wasn't a grid to be assembled
@ -230,107 +168,11 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor,
newRecord.setLocation(coverage);
newRecord.setDatasetId(thinned.getModelName());
newRecord.setOverwriteAllowed(true);
return newRecord;
}
/**
* Adds a record to the queue to be assembled by the grid assembler threads.
* Returns true if the grid was added to the queue, false if the add failed
* and the grid should be assembled by the caller.
*
* @param pendingRecord
* @param assembledRecord
* @return
*/
private boolean addPendingRecord(GridRecord pendingRecord,
GridRecord assembledRecord) {
GridCoverage coverage = pendingRecord.getLocation();
long pointsInGrid = coverage.getNx() * coverage.getNy();
synchronized (pendingRecords) {
/* shutting down, record should be stored by calling thread */
if (!running) {
return false;
}
if (assemblerThreads == null) {
/*
* Start assembler threads if they haven't been started yet
*/
assemblerThreads = new GridAssembler[numAssemblerThreads];
for (int i = 0; i < assemblerThreads.length; i++) {
assemblerThreads[i] = new GridAssembler("GridAssembler-"
+ (i + 1));
assemblerThreads[i].start();
}
}
boolean logMessage = true;
bytesInMemory += pointsInGrid * 4;
List<GridRecord> pendingList = pendingRecords.get(assembledRecord);
if (pendingList == null) {
pendingList = new ArrayList<>(4);
pendingRecords.put(assembledRecord, pendingList);
}
pendingList.add(pendingRecord);
pendingRecords.notifyAll();
while (bytesInMemory > maxBytesInMemory) {
if (logMessage) {
statusHandler.info("Max Grids in " + getClass().getName()
+ " exceeded. Waiting for grids to process");
logMessage = false;
}
try {
pendingRecords.wait();
} catch (InterruptedException e) {
// ignore
}
}
}
return true;
}
/**
* Processes a single GridRecord
*
* @param record
* The GridRecord to process
* @param thinned
* The composite model for which the GridRecord is a part of
* @return The new grib record
* @throws Exception
*/
private void processGrids(GridRecord assembledRecord,
List<GridRecord> recordsToAssemble) throws Exception {
boolean exists = DataURIDatabaseUtil.existingDataURI(assembledRecord);
GridDao dao = (GridDao) PluginFactory.getInstance().getPluginDao(
GridConstants.GRID);
if (!exists) {
GridCoverage coverage = assembledRecord.getLocation();
float[] data = new float[coverage.getNx() * coverage.getNy()];
Arrays.fill(data, GridUtil.GRID_FILL_VALUE);
assembledRecord.setMessageData(data);
} else {
FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data(
assembledRecord, -1)[0];
assembledRecord.setMessageData(rec.getFloatData());
}
mergeData(assembledRecord, recordsToAssemble);
assembledRecord.setOverwriteAllowed(true);
assembledRecord.setInsertTime(TimeUtil.newGmtCalendar());
dao.persistRecords(assembledRecord);
EDEXUtil.getMessageProducer().sendSync("notificationAggregation",
new PluginDataObject[] { assembledRecord });
}
/**
* Merges the data from a GridRecord into the composite GridRecord
*
@ -342,225 +184,99 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor,
* The composite model definition
* @throws GribException
*/
private void mergeData(GridRecord assembledRecord,
List<GridRecord> recordsToAssemble) throws GribException {
CompositeModel thinned = thinnedModels.get(assembledRecord
.getDatasetId());
GridCoverage assembledCoverage = assembledRecord.getLocation();
float[][] assembledData = Util.resizeDataTo2D(
(float[]) assembledRecord.getMessageData(),
assembledCoverage.getNx(), assembledCoverage.getNy());
private GridRecord setPartialGrid(CompositeModel thinned,
GridRecord assembledRecord, GridRecord recordToAssemble)
throws GribException {
PartialGrid pGrid = new PartialGrid();
for (GridRecord record : recordsToAssemble) {
String modelName = record.getDatasetId();
GridCoverage coverage = record.getLocation();
String modelName = recordToAssemble.getDatasetId();
GridCoverage coverage = recordToAssemble.getLocation();
int nx = coverage.getNx();
int ny = coverage.getNy();
List<String> compModels = thinned.getModelList();
int modIndex = compModels.indexOf(modelName);
if (modIndex == -1) {
/*
* Shouldn't be possible since was how it was found in the first
* place
*/
throw new GribException(
"Error assembling grids. Thinned grid definition does not contain "
+ modelName);
}
int nx = coverage.getNx();
int ny = coverage.getNy();
pGrid.setNx(nx);
pGrid.setNy(ny);
/*
* TODO: This should map the UL corner of recordToAssemble to
* assembledRecord instead of relying on index in list
*/
List<String> compModels = thinned.getModelList();
int modIndex = compModels.indexOf(modelName);
if (modIndex == -1) {
/*
* TODO: This should map the UL corner of record to assembledRecord
* instead of relying on index in list
* Shouldn't be possible since was how it was found in the first
* place
*/
Util.insertSubgrid(assembledData, Util.resizeDataTo2D(
(float[]) record.getMessageData(), coverage.getNx(),
coverage.getNy()), (nx * modIndex) - modIndex, 0, nx, ny);
throw new GribException(
"Error assembling grids. Thinned grid definition does not contain "
+ modelName);
}
assembledRecord.setMessageData(Util.resizeDataTo1D(assembledData,
assembledCoverage.getNy(), assembledCoverage.getNx()));
}
pGrid.setxOffset((nx * modIndex) - modIndex);
pGrid.setyOffset(0);
assembledRecord.addExtraAttribute(PartialGrid.KEY, pGrid);
assembledRecord.setMessageData(recordToAssemble.getMessageData());
@Override
public void preStart() {
// null op
}
@Override
public void postStart() {
// null op
}
@Override
public void preStop() {
running = false;
if (assemblerThreads != null) {
synchronized (pendingRecords) {
pendingRecords.notifyAll();
if (pendingRecords.size() > 0) {
statusHandler.info("Waiting for " + pendingRecords.size()
+ " grids to be assembled");
}
}
for (GridAssembler assembler : assemblerThreads) {
try {
assembler.join();
} catch (InterruptedException e) {
// ignore
}
}
}
}
/*
* (non-Javadoc)
*
* @see com.raytheon.uf.edex.core.IContextStateProcessor#postStop()
*/
@Override
public void postStop() {
return checkWorldWrap(assembledRecord);
}
/**
* Thread to assemble sectorized grids into the overall parent grid.
* Checks if assembledRecord's partial grid has extra columns that wrap
* around. This is due to partial grids having 1 column of overlap between
* each partial grid.
*
* @param assembledRecord
* @return
*/
private class GridAssembler extends Thread {
public GridAssembler(String name) {
super(name);
private GridRecord checkWorldWrap(GridRecord assembledRecord) {
GridCoverage assembledCoverage = assembledRecord.getLocation();
int assembledNx = assembledCoverage.getNx();
PartialGrid pGrid = (PartialGrid) assembledRecord
.getExtraAttribute(PartialGrid.KEY);
int xOffset = pGrid.getxOffset();
int nx = pGrid.getNx();
int ny = pGrid.getNy();
// check world wrap due to overlapping columns
if ((xOffset + nx) > assembledNx) {
float[] messageData = (float[]) assembledRecord.getMessageData();
float[][] data2D = Util.resizeDataTo2D(messageData, nx, ny);
// cut off extra data from assembledRecord
int newNx = assembledNx - xOffset;
pGrid.setNx(newNx);
assembledRecord.setMessageData(trimGridAndMake1D(data2D, 0, 0,
newNx, ny));
// make a secondary record for the wrap amount
GridRecord wrappedRecord = new GridRecord(assembledRecord);
PartialGrid wrappedPartial = new PartialGrid();
wrappedPartial.setxOffset(0);
wrappedPartial.setyOffset(0);
wrappedPartial.setNx(nx - newNx);
wrappedPartial.setNy(ny);
wrappedRecord.addExtraAttribute(PartialGrid.KEY, wrappedPartial);
wrappedRecord.setMessageData(trimGridAndMake1D(data2D, newNx, 0,
wrappedPartial.getNx(), ny));
wrappedRecord.setOverwriteAllowed(true);
return wrappedRecord;
}
@Override
public void run() {
boolean keepProcessing = running;
long timeToAssembleGrid = 0;
do {
GridRecord compositeRecord = null;
List<GridRecord> recordsToAssemble = null;
ClusterTask ct = null;
return null;
}
try {
int index = 0;
do {
compositeRecord = getNextRecord(index);
private float[] trimGridAndMake1D(float[][] data, int xOffset, int yOffset,
int nx, int ny) {
float[][] rval = new float[ny][nx];
/*
* check compositeRecord in case a shutdown was
* triggered
*/
if (compositeRecord != null) {
String lockName = compositeRecord.getDataURI();
ct = ClusterLockUtils.lock(CLUSTER_TASK_NAME,
lockName, 120000, false);
if (!LockState.SUCCESSFUL.equals(ct.getLockState())) {
index++;
continue;
}
synchronized (pendingRecords) {
recordsToAssemble = pendingRecords
.remove(compositeRecord);
}
}
} while ((recordsToAssemble == null) && running);
if (recordsToAssemble != null) {
long t0 = System.currentTimeMillis();
processGrids(compositeRecord, recordsToAssemble);
timeToAssembleGrid = System.currentTimeMillis() - t0;
}
} catch (Throwable e) {
statusHandler.error(
"Uncaught exception while assembling grids", e);
} finally {
if (ct != null) {
/*
* lock is time based, need to delete lock instead of
* just unlocking
*/
ClusterLockUtils.deleteLock(ct.getId().getName(), ct
.getId().getDetails());
}
if (!CollectionUtil.isNullOrEmpty(recordsToAssemble)) {
long points = 0;
for (GridRecord rec : recordsToAssemble) {
GridCoverage location = rec.getLocation();
points += location.getNx() * location.getNy();
}
int remaining = 0;
synchronized (pendingRecords) {
bytesInMemory -= points * 4;
pendingRecords.notifyAll();
remaining = pendingRecords.size();
}
int count = recordsToAssemble.size();
StringBuilder msg = new StringBuilder(80);
msg.append("Took ").append(timeToAssembleGrid)
.append("ms to merge ").append(count)
.append((count == 1 ? " grid. " : " grids. "))
.append(remaining)
.append((remaining == 1 ? " grid" : " grids"))
.append(" remaining to merge.");
statusHandler.info(msg.toString());
}
}
keepProcessing = running;
if (!keepProcessing) {
synchronized (pendingRecords) {
keepProcessing = !pendingRecords.isEmpty();
}
}
} while (keepProcessing);
}
/**
* Returns the next record to be assembled. Index allows for skipping of
* records in case the lock is being held by other threads/processes.
*
* @param index
* @return
*/
private GridRecord getNextRecord(int index) {
GridRecord rval = null;
synchronized (pendingRecords) {
// avoid holding lock for extended period
while (pendingRecords.isEmpty() && running) {
try {
pendingRecords.wait();
} catch (InterruptedException e) {
// ignore
}
}
if (pendingRecords.size() > 0) {
Iterator<GridRecord> iter = pendingRecords.keySet()
.iterator();
index %= pendingRecords.size();
// skip previously checked entries
for (int i = 0; i < index; i++) {
iter.next();
}
rval = iter.next();
}
for (int row = 0; row < ny; row++) {
for (int col = 0; col < nx; col++) {
rval[row][col] = data[yOffset + row][xOffset + col];
}
return rval;
}
return Util.resizeDataTo1D(rval, ny, nx);
}
}

View file

@ -2,9 +2,9 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Grid
Bundle-SymbolicName: com.raytheon.uf.edex.plugin.grid
Bundle-Version: 1.0.0.qualifier
Bundle-Version: 1.15.0.qualifier
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Import-Package: com.raytheon.uf.common.inventory.tree,
com.raytheon.uf.common.dataplugin,
com.raytheon.uf.common.dataplugin.persist,
@ -17,9 +17,9 @@ Import-Package: com.raytheon.uf.common.inventory.tree,
com.raytheon.uf.edex.database.cluster,
com.raytheon.uf.edex.database.dao,
com.raytheon.uf.edex.database.plugin,
com.raytheon.uf.edex.database.query,
org.apache.commons.logging
Export-Package: com.raytheon.uf.edex.plugin.grid.dao,
com.raytheon.uf.edex.database.query
Export-Package: com.raytheon.uf.edex.plugin.grid,
com.raytheon.uf.edex.plugin.grid.dao,
com.raytheon.uf.edex.plugin.grid.handler
Require-Bundle: com.raytheon.uf.common.parameter;bundle-version="1.0.0",
com.raytheon.uf.common.geospatial;bundle-version="1.12.1174",

View file

@ -0,0 +1,108 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.edex.plugin.grid;
/**
* Designates that the messageData of the GridRecord is only a partial grid.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 11, 2015 4868 rjpeter Initial creation
*
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class PartialGrid {
public static final String KEY = PartialGrid.class.getSimpleName();
private int nx;
private int ny;
private int xOffset;
private int yOffset;
/**
* @return the nx
*/
public int getNx() {
return nx;
}
/**
* @param nx
* the nx to set
*/
public void setNx(int nx) {
this.nx = nx;
}
/**
* @return the ny
*/
public int getNy() {
return ny;
}
/**
* @param ny
* the ny to set
*/
public void setNy(int ny) {
this.ny = ny;
}
/**
* @return the xOffset
*/
public int getxOffset() {
return xOffset;
}
/**
* @param xOffset
* the xOffset to set
*/
public void setxOffset(int xOffset) {
this.xOffset = xOffset;
}
/**
* @return the yOffset
*/
public int getyOffset() {
return yOffset;
}
/**
* @param yOffset
* the yOffset to set
*/
public void setyOffset(int yOffset) {
this.yOffset = yOffset;
}
}

View file

@ -55,11 +55,13 @@ import com.raytheon.uf.common.gridcoverage.lookup.GridCoverageLookup;
import com.raytheon.uf.common.parameter.Parameter;
import com.raytheon.uf.common.parameter.lookup.ParameterLookup;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.GridUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.database.plugin.PluginDao;
import com.raytheon.uf.edex.plugin.grid.PartialGrid;
/**
* Data access object for accessing Grid records from the database
@ -112,8 +114,23 @@ public class GridDao extends PluginDao {
group = "/" + location.getId();
datasetName = abbrev;
}
AbstractStorageRecord storageRecord = new FloatDataRecord(
datasetName, group, (float[]) messageData, 2, sizes);
AbstractStorageRecord storageRecord = null;
Object partialGrid = gridRec.getExtraAttribute(PartialGrid.KEY);
if ((partialGrid != null) && (partialGrid instanceof PartialGrid)) {
/* Check if dataset needs to be created */
PartialGrid pGrid = (PartialGrid) partialGrid;
long[] pGridSize = new long[] { pGrid.getNx(), pGrid.getNy() };
long[] pGridOffset = new long[] { pGrid.getxOffset(),
pGrid.getyOffset() };
storageRecord = new FloatDataRecord(datasetName, group,
(float[]) messageData, 2, pGridSize);
storageRecord.setMinIndex(pGridOffset);
storageRecord.setMaxSizes(sizes);
storageRecord.setFillValue(GridUtil.GRID_FILL_VALUE);
} else {
storageRecord = new FloatDataRecord(datasetName, group,
(float[]) messageData, 2, sizes);
}
storageRecord.setCorrelationObject(gridRec);
StorageProperties sp = new StorageProperties();

View file

@ -43,6 +43,8 @@
# Jul 27, 2015 4402 njensen Set fill_time_never on write if fill value is None
# Jul 30, 2015 1574 nabowle Add deleteOrphanFiles()
# Aug 20, 2015 DR 17726 mgamazaychikov Remove __doMakeReadable method
# Sep 14, 2015 4868 rjpeter Updated writePartialHDFData to create the dataset if
# it doesn't exist.
#
import h5py, os, numpy, pypies, re, logging, shutil, time, types, traceback
@ -140,8 +142,9 @@ class H5pyDataStore(IDataStore.IDataStore):
rootNode=f['/']
group = self.__getNode(rootNode, record.getGroup(), None, create=True)
if record.getMinIndex() is not None and len(record.getMinIndex()):
ss = self.__writePartialHDFDataset(f, data, record.getDimension(), record.getSizes(),
group[record.getName()], props, record.getMinIndex())
ss = self.__writePartialHDFDataset(f, data, record.getDimension(), record.getSizes(), record.getName(),
group, props, self.__getHdf5Datatype(record), record.getMinIndex(),
record.getMaxSizes(), record.getFillValue())
else:
ss = self.__writeHDFDataset(f, data, record.getDimension(), record.getSizes(), record.getName(),
group, props, self.__getHdf5Datatype(record), storeOp, record)
@ -167,7 +170,7 @@ class H5pyDataStore(IDataStore.IDataStore):
data = data.reshape(szDims1)
ss = {}
if dataset in group.keys():
if dataset in group:
ds = group[dataset]
if storeOp == 'STORE_ONLY':
raise StorageException('Dataset ' + str(dataset) + ' already exists in group ' + str(group))
@ -217,6 +220,12 @@ class H5pyDataStore(IDataStore.IDataStore):
#dtype.set_strpad(h5t.STR_NULLTERM)
return dtype
# Common use case of arrays are passed in x/y and orientation of data is y/x
def __reverseDimensions(self, dims):
revDims = [None, ] * len(dims)
for i in range(len(dims)):
revDims[i] = dims[len(dims) - i - 1]
return revDims
def __calculateChunk(self, nDims, dataType, storeOp, maxDims):
if nDims == 1:
@ -253,6 +262,12 @@ class H5pyDataStore(IDataStore.IDataStore):
else:
raise NotImplementedException("Storage of " + str(nDims) + " dimensional " + \
"data with mode " + storeOp + " not supported yet")
# ensure chunk is not bigger than dimensions
if maxDims is not None:
for i in range(nDims):
chunk[i] = chunk[i] if maxDims[i] is None else min(chunk[i], maxDims[i])
chunk = tuple(chunk)
return chunk
@ -262,28 +277,38 @@ class H5pyDataStore(IDataStore.IDataStore):
for key in attrs:
dataset.attrs[key] = attrs[key]
def __writePartialHDFDataset(self, f, data, dims, szDims, ds, props,
minIndex):
# reverse sizes for hdf5
szDims1 = [None, ] * len(szDims)
for i in range(len(szDims)):
szDims1[i] = szDims[len(szDims) - i - 1]
offset = [None, ] * len(minIndex)
for i in range(len(minIndex)):
offset[i] = minIndex[len(minIndex) - i - 1]
def __writePartialHDFDataset(self, f, data, dims, szDims, dataset, group, props, dataType,
minIndex, maxSizes, fillValue):
# Change dimensions to be Y/X
szDims1 = self.__reverseDimensions(szDims)
offset = self.__reverseDimensions(minIndex)
# process chunking
# chunkSize = None
# if data.dtype != numpy._string and data.dtype != numpy._object:
# chunkSize = DEFAULT_CHUNK_SIZE
# else:
# chunkSize = 1
# chunk = [chunkSize] * len(szDims)
data = data.reshape(szDims1)
ss = {}
if dataset in group:
ds=group[dataset]
ss['op'] = 'REPLACE'
else:
if maxSizes is None:
raise StorageException('Dataset ' + dataset + ' does not exist for partial write. MaxSizes not specified to create initial dataset')
maxDims = self.__reverseDimensions(maxSizes)
nDims = len(maxDims)
chunk = self.__calculateChunk(nDims, dataType, 'STORE_ONLY', maxDims)
compression = None
if props:
compression = props.getCompression()
ds = self.__createDatasetInternal(group, dataset, dataType, maxDims, None, chunk, compression, fillValue)
ss['op'] = 'STORE_ONLY'
if ds.shape[0] < data.shape[0] or ds.shape[1] < data.shape[1]:
raise StorageException('Partial write larger than original dataset. Original shape [' + str(ds.shape) + '], partial ')
endIndex = [offset[0] + szDims1[0], offset[1] + szDims1[1]]
ds[offset[0]:endIndex[0], offset[1]:endIndex[1]] = data
return {'op':'REPLACE'}
return ss
def delete(self, request):
@ -349,7 +374,7 @@ class H5pyDataStore(IDataStore.IDataStore):
# recursively looks for data sets
def __hasDataSet(self, group):
for key in group.keys():
for key in group:
child=group[key]
if type(child) == h5py.highlevel.Dataset:
return True
@ -509,9 +534,7 @@ class H5pyDataStore(IDataStore.IDataStore):
# reverse sizes for hdf5
szDims = rec.getSizes()
szDims1 = [None, ] * len(szDims)
for i in range(len(szDims)):
szDims1[i] = szDims[len(szDims) - i - 1]
szDims1 = self.__reverseDimensions(szDims)
szDims = tuple(szDims1)
chunks = None