Issue #1615 Changed ProcessEvent pluginName to dataType. Refactored GribDecoder decode method to a Processor exchange method.
Change-Id: Ib3eaf4c319d844c277dd7288d565bb9508fa46f8 Former-commit-id: 6eff40bb823594da5346022eeffc15910cdfb799
This commit is contained in:
parent
4452a5d62f
commit
26f2374d54
7 changed files with 83 additions and 38 deletions
|
@ -86,7 +86,7 @@
|
|||
<pipeline>
|
||||
<bean ref="stringToFile" />
|
||||
<bean ref="largeFileChecker" />
|
||||
<bean ref="gribDecoder" method="decode" />
|
||||
<bean ref="gribDecoder" />
|
||||
<bean ref="gribSplitter" method="clean" />
|
||||
<!-- send for processing -->
|
||||
<bean ref="gribPostProcessor" method="process" />
|
||||
|
|
|
@ -21,18 +21,21 @@ package com.raytheon.edex.plugin.grib;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Processor;
|
||||
|
||||
import ucar.grib.GribChecker;
|
||||
import ucar.unidata.io.RandomAccessFile;
|
||||
|
||||
import com.raytheon.edex.esb.Headers;
|
||||
import com.raytheon.edex.plugin.grib.exception.GribException;
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.dataplugin.PluginException;
|
||||
import com.raytheon.uf.common.dataplugin.grid.GridRecord;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.edex.core.EDEXUtil;
|
||||
import com.raytheon.uf.edex.python.decoder.PythonDecoder;
|
||||
|
||||
/**
|
||||
|
@ -43,54 +46,53 @@ import com.raytheon.uf.edex.python.decoder.PythonDecoder;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* 3/12/10 4758 bphillip Initial creation
|
||||
* 02/12/2013 1615 bgonzale public decode method to a Processor exchange method.
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class GribDecoder {
|
||||
public class GribDecoder implements Processor {
|
||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(GribDecoder.class);
|
||||
|
||||
public GribDecoder() {
|
||||
/**
|
||||
* @see org.apache.camel.Processor.process(Exchange)
|
||||
*/
|
||||
@Override
|
||||
public void process(Exchange exchange) throws Exception {
|
||||
final String DATA_TYPE = "dataType";
|
||||
final String GRIB = "grib";
|
||||
|
||||
}
|
||||
File file = (File) exchange.getIn().getBody();
|
||||
Map<String, Object> headers = exchange.getIn().getHeaders();
|
||||
|
||||
public GridRecord[] decode(File file, Headers headers) {
|
||||
GridRecord[] records = null;
|
||||
RandomAccessFile raf = null;
|
||||
int edition = 0;
|
||||
GridRecord[] records = null;
|
||||
try {
|
||||
raf = new RandomAccessFile(file.getAbsolutePath(), "r");
|
||||
raf.order(RandomAccessFile.BIG_ENDIAN);
|
||||
edition = GribChecker.getEdition(raf);
|
||||
exchange.getIn().setHeader(DATA_TYPE, GRIB + edition);
|
||||
|
||||
if (edition == 1) {
|
||||
switch (edition) {
|
||||
case 1:
|
||||
records = new Grib1Decoder().decode(file.getAbsolutePath());
|
||||
} else if (edition == 2) {
|
||||
PythonDecoder pythonDecoder = new PythonDecoder();
|
||||
pythonDecoder.setPluginName("grib");
|
||||
pythonDecoder.setPluginFQN("com.raytheon.edex.plugin.grib");
|
||||
pythonDecoder.setModuleName("GribDecoder");
|
||||
pythonDecoder.setRecordClassname(GridRecord.class.toString());
|
||||
pythonDecoder.setCache(true);
|
||||
try {
|
||||
PluginDataObject[] pdos = pythonDecoder.decode(file);
|
||||
records = new GridRecord[pdos.length];
|
||||
for (int i = 0; i < pdos.length; i++) {
|
||||
records[i] = (GridRecord) pdos[i];
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new GribException("Error decoding grib file!", e);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
case 2:
|
||||
records = decodeGrib2(file);
|
||||
break;
|
||||
default:
|
||||
throw new GribException("Unknown grib version detected ["
|
||||
+ edition + "]");
|
||||
}
|
||||
|
||||
String datasetId = (String) headers.get("datasetid");
|
||||
String secondaryId = (String) headers.get("secondaryid");
|
||||
String ensembleId = (String) headers.get("ensembleid");
|
||||
|
||||
if (secondaryId != null || datasetId != null || ensembleId != null) {
|
||||
for (GridRecord record : records) {
|
||||
if (datasetId != null) {
|
||||
|
@ -120,6 +122,36 @@ public class GribDecoder {
|
|||
"Unable to close RandomAccessFile!", e);
|
||||
}
|
||||
}
|
||||
exchange.getIn().setBody(records);
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode a grib 2 file.
|
||||
*
|
||||
* @param file
|
||||
* Grib 2 file
|
||||
* @return Array of GribRecords parsed from the file.
|
||||
* @throws PluginException
|
||||
*/
|
||||
private GridRecord[] decodeGrib2(File file)
|
||||
throws PluginException {
|
||||
GridRecord[] records = null;
|
||||
PythonDecoder pythonDecoder = new PythonDecoder();
|
||||
|
||||
pythonDecoder.setPluginName("grib");
|
||||
pythonDecoder.setPluginFQN("com.raytheon.edex.plugin.grib");
|
||||
pythonDecoder.setModuleName("GribDecoder");
|
||||
pythonDecoder.setRecordClassname(GridRecord.class.toString());
|
||||
pythonDecoder.setCache(true);
|
||||
try {
|
||||
PluginDataObject[] pdos = pythonDecoder.decode(file);
|
||||
records = new GridRecord[pdos.length];
|
||||
for (int i = 0; i < pdos.length; i++) {
|
||||
records[i] = (GridRecord) pdos[i];
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new GribException("Error decoding grib file!", e);
|
||||
}
|
||||
return records;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,11 +56,17 @@
|
|||
<!-- Begin Radar routes -->
|
||||
<route id="radarIngestRoute">
|
||||
<from uri="jms-radar:queue:Ingest.Radar?destinationResolver=#qpidDurableResolver" />
|
||||
<setHeader headerName="dataType">
|
||||
<constant>Radar-SBN</constant>
|
||||
</setHeader>
|
||||
<to uri="direct:radarcommon" />
|
||||
</route>
|
||||
|
||||
<route id="radarRadarServerIngestRoute">
|
||||
<from uri="jms-radar:queue:Ingest.RadarRadarServer?destinationResolver=#qpidDurableResolver" />
|
||||
<setHeader headerName="dataType">
|
||||
<constant>Radar-Local</constant>
|
||||
</setHeader>
|
||||
<to uri="direct:radarcommon" />
|
||||
</route>
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Oct 25, 2012 #1292 bgonzale Initial creation
|
||||
* Feb 12, 2013 #1615 bgonzale Changed pluginName to dataType.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -59,7 +60,7 @@ public class ProcessEvent extends StatisticsEvent {
|
|||
private String message;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private String pluginName;
|
||||
private String dataType;
|
||||
|
||||
@DynamicSerializeElement
|
||||
private String fileName;
|
||||
|
@ -99,10 +100,10 @@ public class ProcessEvent extends StatisticsEvent {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return the pluginName
|
||||
* @return the dataType
|
||||
*/
|
||||
public String getPluginName() {
|
||||
return pluginName;
|
||||
public String getDataType() {
|
||||
return dataType;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -136,11 +137,11 @@ public class ProcessEvent extends StatisticsEvent {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param pluginName
|
||||
* the pluginName to set
|
||||
* @param dataType
|
||||
* the dataType to set
|
||||
*/
|
||||
public void setPluginName(String pluginName) {
|
||||
this.pluginName = pluginName;
|
||||
public void setDataType(String dataType) {
|
||||
this.dataType = dataType;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -59,6 +59,7 @@ import com.raytheon.uf.edex.database.plugin.PluginDao;
|
|||
* 02/07/2009 1981 dhladky Initial Creation.
|
||||
* 30NOV2012 1372 dhladky Added statistics.
|
||||
* 02/05/2013 1580 mpduff EventBus refactor.
|
||||
* 02/12/2013 1615 bgonzale Changed ProcessEvent pluginName to dataType.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -442,7 +443,7 @@ public abstract class CompositeProductGenerator implements
|
|||
String pluginName = getPluginDataObjects()[0].getPluginName();
|
||||
|
||||
if (pluginName != null) {
|
||||
processEvent.setPluginName(pluginName);
|
||||
processEvent.setDataType(pluginName);
|
||||
}
|
||||
|
||||
Long dequeueTime = message.getDeQueuedTime();
|
||||
|
|
|
@ -45,6 +45,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* Dec 1, 2008 chammack Initial creation
|
||||
* Feb 05, 2013 1580 mpduff EventBus refactor.
|
||||
* Feb 12, 2013 1615 bgonzale Changed ProcessEvent pluginName to dataType.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -127,10 +128,14 @@ public class ProcessUtil {
|
|||
StringBuilder sb = new StringBuilder(128);
|
||||
|
||||
ProcessEvent processEvent = new ProcessEvent();
|
||||
String dataType = getHeaderProperty(headers, "dataType");
|
||||
String pluginName = getHeaderProperty(headers, "pluginName");
|
||||
if (pluginName != null) {
|
||||
if (dataType != null) {
|
||||
sb.append(dataType);
|
||||
processEvent.setDataType(dataType);
|
||||
} else if (pluginName != null) {
|
||||
sb.append(pluginName);
|
||||
processEvent.setPluginName(pluginName);
|
||||
processEvent.setDataType(pluginName);
|
||||
}
|
||||
|
||||
String fileName = getHeaderProperty(headers, "ingestFileName");
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
<!-- Event Type should be fully qualified name of stat event -->
|
||||
<statisticsEvent type="com.raytheon.uf.common.stats.ProcessEvent"
|
||||
displayName="Processing Events" category="Data Ingest Events">
|
||||
<statisticsGroup name="pluginName" displayName="Data Type" />
|
||||
<statisticsGroup name="dataType" displayName="Data Type" />
|
||||
<!-- Processing time available display units:
|
||||
ms, Seconds, Minutes, Hours -->
|
||||
<statisticsAggregate field="processingTime"
|
||||
|
|
Loading…
Add table
Reference in a new issue