Merge "Issue #2402 Make grib large grid handling more dynamic. Change-Id: I463f57ac5ccbef2773ee299f36fa808c62d7da2d" into development
Former-commit-id:a99e286c0f
[formerly f0ab566352660170b71b46f6517e371d42f973df] Former-commit-id:dba525c5e6
This commit is contained in:
commit
2d1963c665
14 changed files with 320 additions and 490 deletions
22
deltaScripts/14.2.1/cleanGribQueueAndLock.sh
Normal file
22
deltaScripts/14.2.1/cleanGribQueueAndLock.sh
Normal file
|
@ -0,0 +1,22 @@
|
|||
#!/bin/bash
|
||||
# This script will delete the Ingest.Grib queue from qpid.
|
||||
# qpid must be running when this script is executed.
|
||||
#
|
||||
# This script will also remove the large grib file lock
|
||||
#
|
||||
# This update is required with 14.2.1.
|
||||
#
|
||||
|
||||
PSQL="/awips2/psql/bin/psql"
|
||||
|
||||
echo "INFO: Deleting Ingest.Grib queue."
|
||||
|
||||
curl -X DELETE http://cp1f:8180/rest/queue/edex/Ingest.Grib > /dev/null
|
||||
|
||||
echo "INFO: Deleting GribIngestLargeFile cluster locks."
|
||||
|
||||
${PSQL} -U awips -d metadata -c "delete from cluster_task where name = 'GribIngestLargeFile';"
|
||||
|
||||
echo "INFO: The update was applied successfully."
|
||||
|
||||
exit 0
|
|
@ -19,14 +19,16 @@
|
|||
<property name="maxPoolSize" value="${grib-decode.count.threads}" />
|
||||
</bean>
|
||||
|
||||
<bean id="largeFileChecker" class="com.raytheon.edex.plugin.grib.GribLargeFileChecker" />
|
||||
|
||||
<bean id="gribGridPointLock" class="com.raytheon.edex.plugin.grib.GribGridPointLock">
|
||||
<constructor-arg value="${grib-decode.count.gridpoints}"/>
|
||||
<constructor-arg value="${grib-decode.count.threads}"/>
|
||||
</bean>
|
||||
|
||||
<bean id="gribSplitter" class="com.raytheon.edex.plugin.grib.GribSplitter" />
|
||||
|
||||
<bean id="useLatestAggregationStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy" />
|
||||
|
||||
<bean id="largeFileLockRelease" class="com.raytheon.edex.plugin.grib.GribLockRelease" />
|
||||
|
||||
<bean id="gribDecodeCamelRegistered" factory-bean="contextManager"
|
||||
factory-method="register" depends-on="persistCamelRegistered">
|
||||
<constructor-arg ref="grib-decode"/>
|
||||
|
@ -99,7 +101,7 @@
|
|||
<from ref="gribDecodeJmsEndpoint" />
|
||||
<doTry>
|
||||
<pipeline>
|
||||
<bean ref="largeFileChecker" />
|
||||
<bean ref="gribGridPointLock" method="reserve"/>
|
||||
<bean ref="gribDecoder" />
|
||||
<!-- send for processing -->
|
||||
<bean ref="gribPostProcessor" method="process" />
|
||||
|
@ -115,7 +117,7 @@
|
|||
<to uri="log:grib?level=ERROR"/>
|
||||
</doCatch>
|
||||
<doFinally>
|
||||
<bean ref="largeFileLockRelease" />
|
||||
<bean ref="gribGridPointLock" method="release"/>
|
||||
</doFinally>
|
||||
</doTry>
|
||||
</route>
|
||||
|
|
|
@ -19,14 +19,15 @@
|
|||
<property name="maxPoolSize" value="${grib-decode.count.threads}" />
|
||||
</bean>
|
||||
|
||||
<bean id="largeFileChecker" class="com.raytheon.edex.plugin.grib.GribLargeFileChecker" />
|
||||
<bean id="gribGridPointLock" class="com.raytheon.edex.plugin.grib.GribGridPointLock">
|
||||
<constructor-arg value="${grib-decode.count.gridpoints}"/>
|
||||
<constructor-arg value="${grib-decode.count.threads}"/>
|
||||
</bean>
|
||||
|
||||
<bean id="gribSplitter" class="com.raytheon.edex.plugin.grib.GribSplitter" />
|
||||
|
||||
<bean id="useLatestAggregationStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy" />
|
||||
|
||||
<bean id="largeFileLockRelease" class="com.raytheon.edex.plugin.grib.GribLockRelease" />
|
||||
|
||||
<bean id="gribDecodeCamelRegistered" factory-bean="contextManager"
|
||||
factory-method="register" depends-on="persistCamelRegistered">
|
||||
<constructor-arg ref="grib-decode"/>
|
||||
|
@ -95,7 +96,7 @@
|
|||
<from ref="gribDecodeJmsEndpoint" />
|
||||
<doTry>
|
||||
<pipeline>
|
||||
<bean ref="largeFileChecker" />
|
||||
<bean ref="gribGridPointLock" method="reserve"/>
|
||||
<bean ref="gribDecoder" />
|
||||
<!-- send for processing -->
|
||||
<bean ref="gribPostProcessor" method="process" />
|
||||
|
@ -106,7 +107,7 @@
|
|||
<to uri="log:grib?level=ERROR"/>
|
||||
</doCatch>
|
||||
<doFinally>
|
||||
<bean ref="largeFileLockRelease" />
|
||||
<bean ref="gribGridPointLock" method="release"/>
|
||||
</doFinally>
|
||||
</doTry>
|
||||
</route>
|
||||
|
|
|
@ -1,2 +1,7 @@
|
|||
# the number of grib decode threads.
|
||||
grib-decode.count.threads=4
|
||||
# Maximum number of grid points to decode at one time for all threads. Large
|
||||
# grib files may cause the decoder to reach this limit and then some threads
|
||||
# will have to wait. This can be used to control the amount of memory used by
|
||||
# the decoder.
|
||||
grib-decode.count.gridpoints=6000000
|
|
@ -96,7 +96,7 @@ import com.raytheon.uf.common.util.mapping.MultipleMappingException;
|
|||
* Feb 15, 2013 1638 mschenke Moved array based utilities from Util
|
||||
* into ArraysUtil
|
||||
* Aug 30, 2013 2298 rjpeter Make getPluginName abstract
|
||||
* Oct 07, 2013 2042 bsteffen Decode GribDecodeMessage instead of
|
||||
* Oct 07, 2013 2402 bsteffen Decode GribDecodeMessage instead of
|
||||
* files.
|
||||
*
|
||||
* </pre>
|
||||
|
|
|
@ -37,7 +37,7 @@ import com.raytheon.uf.edex.python.decoder.PythonDecoder;
|
|||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------- -------- ----------- --------------------------
|
||||
* Oct 04, 2013 2042 bsteffen Initial creation
|
||||
* Oct 04, 2013 2402 bsteffen Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
|
|
@ -34,7 +34,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
|||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------- -------- ----------- --------------------------
|
||||
* Oct 03, 2013 2041 bsteffen Initial creation
|
||||
* Oct 03, 2013 2402 bsteffen Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -59,16 +59,20 @@ public class GribDecodeMessage implements Serializable {
|
|||
@DynamicSerializeElement
|
||||
private byte gribEdition;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private long gridPointCount;
|
||||
|
||||
public GribDecodeMessage() {
|
||||
|
||||
}
|
||||
|
||||
public GribDecodeMessage(String str) {
|
||||
String[] parts = str.split("::");
|
||||
String[] parts = str.split(":", 5);
|
||||
startPosition = Long.valueOf(parts[0]);
|
||||
messageLength = Long.valueOf(parts[1]);
|
||||
gribEdition = Byte.valueOf(parts[2]);
|
||||
fileName = parts[3];
|
||||
gridPointCount = Byte.valueOf(parts[3]);
|
||||
fileName = parts[4];
|
||||
|
||||
}
|
||||
|
||||
|
@ -104,8 +108,16 @@ public class GribDecodeMessage implements Serializable {
|
|||
this.gribEdition = gribEdition;
|
||||
}
|
||||
|
||||
public long getGridPointCount() {
|
||||
return gridPointCount;
|
||||
}
|
||||
|
||||
public void setGridPointCount(long gridPointCount) {
|
||||
this.gridPointCount = gridPointCount;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return startPosition + "::" + messageLength + "::" + gribEdition + "::"
|
||||
+ fileName;
|
||||
return startPosition + ":" + messageLength + ":" + gribEdition + ":"
|
||||
+ gridPointCount + ":" + fileName;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ import com.raytheon.uf.common.time.util.TimeUtil;
|
|||
* exchange method.
|
||||
* Mar 19, 2013 1785 bgonzale Added performance status handler and
|
||||
* added status to process.
|
||||
* Oct 07, 2013 2042 bsteffen Decode GribDecodeMessage instead of
|
||||
* Oct 07, 2013 2402 bsteffen Decode GribDecodeMessage instead of
|
||||
* files.
|
||||
* </pre>
|
||||
*
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.edex.plugin.grib;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.camel.Body;
|
||||
import org.apache.camel.Headers;
|
||||
|
||||
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.PerformanceStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.util.ITimer;
|
||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
|
||||
/**
|
||||
* Object for tracking the number of grid points currently being processed by
|
||||
* the grib decode route. Grid points is used as an approximate measure of the
|
||||
* amount of memory needed to decode the file, limiting the totla number of grid
|
||||
* points keeps memory usage more consistent. Before a file can be decoded the
|
||||
* message should pass through reserve to ensure the grib decoder has enough
|
||||
* free points. After decode the points should be released.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------- -------- ----------- --------------------------
|
||||
* Oct 09, 2013 2402 bsteffen Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bsteffen
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class GribGridPointLock {
|
||||
|
||||
public static final String GRID_POINT_COUNT_HEADER = "gridPointCount";
|
||||
|
||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(GribGridPointLock.class);
|
||||
|
||||
private final IPerformanceStatusHandler perfLog = PerformanceStatus
|
||||
.getHandler("");
|
||||
|
||||
private final long maxConcurrentGridPoints;
|
||||
|
||||
private final long fairConcurrentGridPoints;
|
||||
|
||||
private final AtomicLong currentPoints = new AtomicLong();
|
||||
|
||||
private Queue<Object> waiters = new ConcurrentLinkedQueue<Object>();
|
||||
|
||||
public GribGridPointLock(long maxConcurrentGridPoints,
|
||||
int decodeThreadCount) {
|
||||
this.maxConcurrentGridPoints = maxConcurrentGridPoints;
|
||||
this.fairConcurrentGridPoints = maxConcurrentGridPoints
|
||||
/ decodeThreadCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reserve a message for decode. This method will wait until there are
|
||||
* enough grid points available. Grid points are reserved until release is
|
||||
* called.
|
||||
*
|
||||
* @param message
|
||||
* the message to decode
|
||||
* @param headers
|
||||
* used to store information used in release.
|
||||
*/
|
||||
public void reserve(@Body GribDecodeMessage message,
|
||||
@Headers Map<String, Object> headers) {
|
||||
long gridPoints = message.getGridPointCount();
|
||||
if (gridPoints > maxConcurrentGridPoints) {
|
||||
statusHandler
|
||||
.handle(Priority.EVENTA,
|
||||
String.format(
|
||||
"Large grib file requires exclusive access: (%d > %d): %s",
|
||||
gridPoints, maxConcurrentGridPoints,
|
||||
message.getFileName()));
|
||||
gridPoints = maxConcurrentGridPoints;
|
||||
} else if (gridPoints > fairConcurrentGridPoints) {
|
||||
statusHandler
|
||||
.handle(Priority.EVENTA,
|
||||
String.format(
|
||||
"Large grib file is using many grid points: (%d of %d): %s",
|
||||
gridPoints, maxConcurrentGridPoints,
|
||||
message.getFileName()));
|
||||
}
|
||||
reserve(gridPoints);
|
||||
headers.put(GRID_POINT_COUNT_HEADER, gridPoints);
|
||||
headers.put("dequeueTime", System.currentTimeMillis());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Release grid points after decode.
|
||||
*
|
||||
* @param headers
|
||||
* containing information set in reserve
|
||||
*/
|
||||
public void release(@Headers Map<String, Object> headers) {
|
||||
Object gridPointsHeader = headers.get(GRID_POINT_COUNT_HEADER);
|
||||
if (gridPointsHeader instanceof Number) {
|
||||
long gridPoints = ((Number) gridPointsHeader).longValue();
|
||||
release(gridPoints);
|
||||
}
|
||||
}
|
||||
|
||||
private void reserve(long gridPoints) {
|
||||
if (!fastReserve(gridPoints, false)) {
|
||||
ITimer timer = TimeUtil.getTimer();
|
||||
timer.start();
|
||||
Object waiter = new Object();
|
||||
synchronized (waiter) {
|
||||
waiters.offer(waiter);
|
||||
while (waiters.peek() != waiter
|
||||
|| !fastReserve(gridPoints, true)) {
|
||||
try {
|
||||
waiter.wait(15000);
|
||||
} catch (InterruptedException e) {
|
||||
;
|
||||
}
|
||||
}
|
||||
}
|
||||
waiters.remove(waiter);
|
||||
notifyNext();
|
||||
timer.stop();
|
||||
perfLog.logDuration("Grib: Time waiting to reserve grid points",
|
||||
timer.getElapsedTime());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean fastReserve(long gridPoints, boolean ignoreWaiters) {
|
||||
long oldPoints = currentPoints.get();
|
||||
long newPoints = oldPoints + gridPoints;
|
||||
while ((ignoreWaiters || waiters.isEmpty())
|
||||
&& newPoints <= maxConcurrentGridPoints) {
|
||||
if (currentPoints.compareAndSet(oldPoints, newPoints)) {
|
||||
return true;
|
||||
}
|
||||
oldPoints = this.currentPoints.get();
|
||||
newPoints = oldPoints + gridPoints;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void release(long gridPoints) {
|
||||
currentPoints.addAndGet(-gridPoints);
|
||||
notifyNext();
|
||||
}
|
||||
|
||||
private void notifyNext() {
|
||||
Object next = waiters.peek();
|
||||
if (next != null) {
|
||||
synchronized (next) {
|
||||
next.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,247 +0,0 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.edex.plugin.grib;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Processor;
|
||||
|
||||
import com.raytheon.edex.plugin.grib.exception.GribException;
|
||||
import com.raytheon.uf.common.localization.IPathManager;
|
||||
import com.raytheon.uf.common.localization.LocalizationContext;
|
||||
import com.raytheon.uf.common.localization.PathManagerFactory;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||
|
||||
/**
|
||||
* Implementation of a Camel Message Processor to check if the ingested grib
|
||||
* file matches one of the designated "large" grib files. If this processor
|
||||
* detects a file containing one of WMO regexes contained in the
|
||||
* largeGribPatterns.xml, all other grib ingest threads will block to wait for
|
||||
* this file to finish processing. Cluster locking is used. The lock is obtained
|
||||
* in this Processor. A second processor, GribLockRelease, is used to release
|
||||
* the lock once the file is done being processed.
|
||||
* <p>
|
||||
* This processor was put in place to ensure large grib files will not throw out
|
||||
* of memory exceptions while processing.
|
||||
*
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------- -------- ----------- --------------------------
|
||||
* Oct 15, 2010 6644 bphillip Initial Creation
|
||||
* Jul 18, 2013 2194 bsteffen Fix site override.
|
||||
* Oct 07, 2013 2042 bsteffen Decode GribDecodeMessage instead of
|
||||
* files.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1
|
||||
* @see com.raytheon.edex.plugin.grib.GribLockRelease
|
||||
*/
|
||||
public class GribLargeFileChecker implements Processor {
|
||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(GribLargeFileChecker.class);
|
||||
|
||||
/**
|
||||
* The header attribute attached to the message to denote whether this file
|
||||
* will receive single threaded handling
|
||||
*/
|
||||
public static final String LARGE_FILE_HEADER = "largeFile";
|
||||
|
||||
/** The cluster task name used for cluster locking */
|
||||
public static final String CLUSTER_TASK_NAME = "GribIngestLargeFile";
|
||||
|
||||
public static final String CLUSTER_TASK_DETAILS;
|
||||
|
||||
/**
|
||||
* The base localization patterns used to specify which WMO header regexes
|
||||
* receive special handling
|
||||
*/
|
||||
private static LargeGribPatterns basePatterns;
|
||||
|
||||
/**
|
||||
* The site localization patterns used to specify which WMO header regexes
|
||||
* receive special handling
|
||||
*/
|
||||
private static LargeGribPatterns sitePatterns;
|
||||
|
||||
static {
|
||||
String host = null;
|
||||
try {
|
||||
// Initialize the cluster task name with the host name
|
||||
host = InetAddress.getLocalHost().getHostName();
|
||||
} catch (UnknownHostException e) {
|
||||
e.printStackTrace();
|
||||
host = "";
|
||||
}
|
||||
CLUSTER_TASK_DETAILS = host;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Exchange exchange) throws Exception {
|
||||
if (basePatterns == null) {
|
||||
loadPatterns();
|
||||
}
|
||||
GribDecodeMessage message = (GribDecodeMessage) exchange.getIn()
|
||||
.getBody();
|
||||
File gribFile = new File(message.getFileName());
|
||||
String header = (String) exchange.getIn().getHeader("header");
|
||||
if (header == null) {
|
||||
// No header entry so will try and use the filename instead
|
||||
header = gribFile.getName();
|
||||
}
|
||||
|
||||
ClusterTask task = ClusterLockUtils.lookupLock(CLUSTER_TASK_NAME,
|
||||
CLUSTER_TASK_DETAILS);
|
||||
boolean waitForLargeGrib = task.isRunning();
|
||||
|
||||
while (waitForLargeGrib) {
|
||||
Thread.sleep(500);
|
||||
task = ClusterLockUtils.lookupLock(CLUSTER_TASK_NAME,
|
||||
CLUSTER_TASK_DETAILS);
|
||||
waitForLargeGrib = task.isRunning();
|
||||
|
||||
// need to handle timing out of large grib file process manually
|
||||
if (waitForLargeGrib
|
||||
&& (System.currentTimeMillis() - task.getLastExecution()) > 120000) {
|
||||
statusHandler
|
||||
.handle(Priority.EVENTA,
|
||||
"Large Grib file process timed out. Clearing lock and resuming processing");
|
||||
ClusterLockUtils
|
||||
.unlock(CLUSTER_TASK_NAME, CLUSTER_TASK_DETAILS);
|
||||
waitForLargeGrib = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (isLarge(header)) {
|
||||
statusHandler.handle(Priority.EVENTA,
|
||||
"Large Grib file detected. Establishing lock.");
|
||||
while (!ClusterLockUtils
|
||||
.lock(CLUSTER_TASK_NAME, CLUSTER_TASK_DETAILS, 120000, true)
|
||||
.getLockState().equals(LockState.SUCCESSFUL)) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
statusHandler.handle(Priority.EVENTA,
|
||||
"Large Grib file lock established.");
|
||||
// Wait for other threads to complete processing before we proceed
|
||||
Thread.sleep(1000);
|
||||
exchange.getIn().setHeader(LARGE_FILE_HEADER, true);
|
||||
|
||||
} else {
|
||||
exchange.getIn().setHeader(LARGE_FILE_HEADER, false);
|
||||
}
|
||||
exchange.getIn().setHeader("dequeueTime", System.currentTimeMillis());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the WMO header patterns for grib files which receive special
|
||||
* handling
|
||||
*
|
||||
* @throws GribException
|
||||
* If the patterns cannot be loaded
|
||||
*/
|
||||
private void loadPatterns() throws GribException {
|
||||
IPathManager pathMgr = PathManagerFactory.getPathManager();
|
||||
LocalizationContext commonStaticBase = pathMgr.getContext(
|
||||
LocalizationContext.LocalizationType.EDEX_STATIC,
|
||||
LocalizationContext.LocalizationLevel.BASE);
|
||||
|
||||
LocalizationContext siteStaticBase = pathMgr.getContext(
|
||||
LocalizationContext.LocalizationType.EDEX_STATIC,
|
||||
LocalizationContext.LocalizationLevel.SITE);
|
||||
String path = "";
|
||||
String sitePath = "";
|
||||
try {
|
||||
path = pathMgr.getFile(commonStaticBase,
|
||||
"grib" + File.separator + "largeGribPatterns.xml")
|
||||
.getCanonicalPath();
|
||||
sitePath = pathMgr.getFile(siteStaticBase,
|
||||
"grib" + File.separator + "largeGribPatterns.xml")
|
||||
.getCanonicalPath();
|
||||
} catch (IOException e) {
|
||||
throw new GribException("Error reading large grib patterns", e);
|
||||
}
|
||||
|
||||
File modelFile = new File(path);
|
||||
File siteModelFile = new File(sitePath);
|
||||
if (siteModelFile.exists()) {
|
||||
sitePatterns = loadPatterns(siteModelFile);
|
||||
}
|
||||
basePatterns = loadPatterns(modelFile);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the regex patterns from the specified file
|
||||
*
|
||||
* @param modelFile
|
||||
* The file to load the regex patterns from
|
||||
* @return An object containing the compiled regex patterns
|
||||
* @throws GribException
|
||||
* If the patterns cannot be loaded
|
||||
*/
|
||||
private LargeGribPatterns loadPatterns(File modelFile) throws GribException {
|
||||
LargeGribPatterns patternSet = null;
|
||||
try {
|
||||
patternSet = (LargeGribPatterns) SerializationUtil
|
||||
.jaxbUnmarshalFromXmlFile(modelFile.getPath());
|
||||
} catch (Exception e) {
|
||||
throw new GribException("File " + modelFile.getAbsolutePath()
|
||||
+ " could not be unmarshalled.", e);
|
||||
}
|
||||
patternSet.compilePatterns();
|
||||
return patternSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the provided header matches the regexes designated for
|
||||
* special handling
|
||||
*
|
||||
* @param header
|
||||
* The header to check
|
||||
* @return True if the file associated with this WMO header is to receive
|
||||
* special handling
|
||||
*/
|
||||
private boolean isLarge(String header) {
|
||||
boolean isLarge = false;
|
||||
if (sitePatterns != null) {
|
||||
isLarge = sitePatterns.isDesiredHeader(header);
|
||||
}
|
||||
if (!isLarge) {
|
||||
isLarge = basePatterns.isDesiredHeader(header);
|
||||
}
|
||||
return isLarge;
|
||||
}
|
||||
}
|
|
@ -1,69 +0,0 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.edex.plugin.grib;
|
||||
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Processor;
|
||||
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||
|
||||
/**
|
||||
* Releases lock obtained in the GribLargeFileChecker class if one was
|
||||
* established.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* 10/15/10 6644 bphillip Initial Creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1
|
||||
* @see com.raytheon.edex.plugin.grib.GribLargeFileChecker
|
||||
*/
|
||||
public class GribLockRelease implements Processor {
|
||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(GribLockRelease.class);
|
||||
|
||||
@Override
|
||||
public void process(Exchange exchange) throws Exception {
|
||||
Boolean isLargeFile = (Boolean) exchange.getIn().getHeader(
|
||||
GribLargeFileChecker.LARGE_FILE_HEADER);
|
||||
if (isLargeFile) {
|
||||
boolean success = ClusterLockUtils.unlock(
|
||||
GribLargeFileChecker.CLUSTER_TASK_NAME,
|
||||
GribLargeFileChecker.CLUSTER_TASK_DETAILS);
|
||||
if (success) {
|
||||
statusHandler.handle(Priority.EVENTA,
|
||||
"Large Grib file lock released!");
|
||||
} else {
|
||||
statusHandler.handle(Priority.CRITICAL,
|
||||
"Large Grib file lock could not be released!!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,6 +29,8 @@ import ucar.unidata.io.KMPMatch;
|
|||
import ucar.unidata.io.RandomAccessFile;
|
||||
|
||||
import com.raytheon.edex.plugin.grib.exception.GribException;
|
||||
import com.raytheon.edex.plugin.grib.spatial.GribSpatialCache;
|
||||
import com.raytheon.uf.common.gridcoverage.GridCoverage;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
|
@ -61,7 +63,7 @@ public class GribSplitter {
|
|||
List<GribDecodeMessage> messages = new ArrayList<GribDecodeMessage>();
|
||||
RandomAccessFile raf = null;
|
||||
try {
|
||||
raf = new RandomAccessFile(file.getAbsolutePath(), "r", 1024);
|
||||
raf = new RandomAccessFile(file.getAbsolutePath(), "r");
|
||||
raf.order(RandomAccessFile.BIG_ENDIAN);
|
||||
while (raf.searchForward(matcher, Integer.MAX_VALUE)) {
|
||||
GribDecodeMessage message = new GribDecodeMessage();
|
||||
|
@ -73,6 +75,22 @@ public class GribSplitter {
|
|||
message.setGribEdition((byte) is.getGribEdition());
|
||||
long length = is.getGribLength();
|
||||
message.setMessageLength(length);
|
||||
switch (is.getGribEdition()) {
|
||||
case 1:
|
||||
message.setGridPointCount(getGrib1GridPointCount(raf,
|
||||
startPosition));
|
||||
break;
|
||||
case 2:
|
||||
message.setGridPointCount(getGrib2GridPointCount(raf,
|
||||
startPosition, length));
|
||||
break;
|
||||
default:
|
||||
/* This is not a grid we can handle. Let the message proceed to the decoder to throw errors.*/
|
||||
message.setGridPointCount(0);
|
||||
break;
|
||||
}
|
||||
System.out.println("Number of points is: "
|
||||
+ message.getGridPointCount());
|
||||
messages.add(message);
|
||||
raf.seek(startPosition + length);
|
||||
/*
|
||||
|
@ -99,4 +117,59 @@ public class GribSplitter {
|
|||
return messages;
|
||||
}
|
||||
|
||||
private long getGrib2GridPointCount(RandomAccessFile raf,
|
||||
long startPosition, long messageLength) throws IOException {
|
||||
long gridPointCount = 0;
|
||||
long totalGridPointCount = 0;
|
||||
long start = startPosition + 16;
|
||||
while (start < startPosition + messageLength - 4) {
|
||||
raf.seek(start);
|
||||
int length = raf.readInt();
|
||||
int section = raf.readByte();
|
||||
switch (section) {
|
||||
case 3:
|
||||
raf.seek(start + 30);
|
||||
int nx = raf.readInt();
|
||||
int ny = raf.readInt();
|
||||
gridPointCount = nx * ny;
|
||||
break;
|
||||
case 7:
|
||||
totalGridPointCount += gridPointCount;
|
||||
break;
|
||||
}
|
||||
start += length;
|
||||
}
|
||||
return totalGridPointCount;
|
||||
}
|
||||
|
||||
private long getGrib1GridPointCount(RandomAccessFile raf,
|
||||
long startPosition)
|
||||
throws IOException {
|
||||
raf.seek(startPosition + 8);
|
||||
int pdsLength = (raf.readUnsignedShort() << 8) + raf.readUnsignedByte();
|
||||
raf.skipBytes(3);
|
||||
int grid = raf.readUnsignedByte();
|
||||
int gdsPresent = raf.readUnsignedByte() & 0x80;
|
||||
if (grid != 255) {
|
||||
GridCoverage coverage = GribSpatialCache.getInstance()
|
||||
.getGridByName(String.valueOf(grid));
|
||||
if (coverage != null) {
|
||||
return coverage.getNx() * coverage.getNy();
|
||||
}
|
||||
}
|
||||
if (gdsPresent != 0) {
|
||||
raf.seek(startPosition + 8 + pdsLength + 6);
|
||||
/*
|
||||
* Note: for Quasi-regular grids nx or ny may be coded as 0xFFFF
|
||||
* which will result ina dramatic overestimate of the number of grid
|
||||
* points. All such known grids are identified by grid number above.
|
||||
*/
|
||||
int nx = raf.readUnsignedShort();
|
||||
int ny = raf.readUnsignedShort();
|
||||
return nx * ny;
|
||||
}
|
||||
/* This is not a grid we can handle. Let the message proceed to the decoder to throw errors.*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,130 +0,0 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.edex.plugin.grib;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlElements;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
import com.raytheon.uf.common.serialization.ISerializableObject;
|
||||
|
||||
/**
|
||||
* Object containing the Grib Patterns which receive single threaded decode
|
||||
* handling.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* 10/15/10 6644 bphillip Initial Creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
* @version 1
|
||||
* @see com.raytheon.edex.plugin.grib.GribLargeFileChecker
|
||||
* @see com.raytheon.edex.plugin.grib.GribLockRelease
|
||||
*/
|
||||
@XmlRootElement(name = "largeGribPatterns")
|
||||
@XmlAccessorType(XmlAccessType.NONE)
|
||||
public class LargeGribPatterns implements ISerializableObject {
|
||||
/**
|
||||
* List of patterns
|
||||
*/
|
||||
@XmlElements({ @XmlElement(name = "regex", type = String.class) })
|
||||
private List<String> patterns;
|
||||
|
||||
/** List of compiled patterns */
|
||||
private List<Pattern> compiledPatterns;
|
||||
|
||||
/**
|
||||
* Creates a new instance of the container.
|
||||
*/
|
||||
public LargeGribPatterns() {
|
||||
this.patterns = new ArrayList<String>();
|
||||
this.compiledPatterns = new ArrayList<Pattern>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of the stored patterns as a series of strings.
|
||||
*
|
||||
* @return a list of regex pattern strings
|
||||
*/
|
||||
public List<String> getPatterns() {
|
||||
return patterns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the list of regex strings for this container.
|
||||
*
|
||||
* @param patterns
|
||||
* an arraylist of regex strings
|
||||
*/
|
||||
public void setPatterns(ArrayList<String> patterns) {
|
||||
this.patterns = patterns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts a single string into the list.
|
||||
*
|
||||
* @param pattern
|
||||
* The regex string to insert
|
||||
*/
|
||||
public void setPattern(String pattern) {
|
||||
this.patterns.add(pattern);
|
||||
}
|
||||
|
||||
/**
|
||||
* Will compile the strings into Pattern objects.
|
||||
*
|
||||
*/
|
||||
public void compilePatterns() {
|
||||
for (String pattern : patterns) {
|
||||
compiledPatterns.add(Pattern.compile(pattern));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes a string and compares against the patterns in this container. The
|
||||
* first one that matches breaks the search and returns true.
|
||||
*
|
||||
* @param header
|
||||
* The string to search for
|
||||
* @return a boolean indicating success
|
||||
*/
|
||||
public boolean isDesiredHeader(String header) {
|
||||
boolean isFound = false;
|
||||
for (Pattern headerPattern : compiledPatterns) {
|
||||
if (headerPattern.matcher(header).find()) {
|
||||
isFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return isFound;
|
||||
}
|
||||
}
|
|
@ -1,25 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
|
||||
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
|
||||
|
||||
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
|
||||
This_software_product_contains_export-restricted_data_whose
|
||||
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
|
||||
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
|
||||
an_export_license_or_other_authorization.
|
||||
|
||||
Contractor_Name:________Raytheon_Company
|
||||
Contractor_Address:_____6825_Pine_Street,_Suite_340
|
||||
________________________Mail_Stop_B8
|
||||
________________________Omaha,_NE_68106
|
||||
________________________402.291.0100
|
||||
|
||||
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
|
||||
further_licensing_information.
|
||||
-->
|
||||
<largeGribPatterns xmlns:ns2="group">
|
||||
<regex>^al*</regex>
|
||||
<regex>^tpcprblty*</regex>
|
||||
<regex>^LGX[TP]*</regex>
|
||||
</largeGribPatterns>
|
Loading…
Add table
Reference in a new issue