Merge "Issue #2113 Improved MADIS ingest to be more resource frugal, target version fixed" into development

Former-commit-id: d6bedc07ce [formerly 8d139f3325] [formerly d6bedc07ce [formerly 8d139f3325] [formerly 2e1d54e686 [formerly 4979a93f2cf0d697d4565641be3de40f4e1e4610]]]
Former-commit-id: 2e1d54e686
Former-commit-id: b2cb6803fd [formerly 728f54c6ea]
Former-commit-id: 1e95673d90
This commit is contained in:
Dustin Johnson 2013-06-18 13:55:51 -05:00 committed by Gerrit Code Review
commit 8a608c1ef6
4 changed files with 164 additions and 23 deletions

View file

@ -255,6 +255,7 @@
<exclude>.*dpa.*</exclude>
<!-- exclude OGC services -->
<exclude>.*ogc.*</exclude>
<exclude>grid-metadata.xml</exclude>
</mode>
<!-- modes listed below are not supported in a production setting, they
@ -343,7 +344,7 @@
<include>madis-common.xml</include>
<include>pointdata-common.xml</include>
<include>madis-dpa-ingest.xml</include>
<include>madis-ogc.xml</include>-->
<include>madis-ogc.xml</include>
<!-- pointdata/obs specific services
<include>obs-common.xml</include>
<include>pointdata-common.xml</include>

View file

@ -42,6 +42,8 @@
<bean id="madisSeparator" class="com.raytheon.uf.edex.plugin.madis.MadisSeparator" depends-on="jmsIngestMadisConfig, jms-madis, madisThreadPool">
<constructor-arg value="jms-madis:queue:Ingest.madisSeparator?destinationResolver=#qpidDurableResolver" />
<!-- time in hours for orphan purging -->
<constructor-arg value="1" />
</bean>
<camelContext id="madis-camel"
@ -86,7 +88,6 @@
</setHeader>
<doTry>
<pipeline>
<bean ref="serializationUtil" method="transformFromThrift" />
<bean ref="madisDecoder" method="decode" />
<bean ref="madisLayerCollector" method="geoFilter"/>
<bean ref="madisPointData" method="toPointData" />
@ -107,5 +108,11 @@
<bean ref="madisLayerCollector" method="buildLayerUpdate" />
</route>
<route id="madisOrphanPurgeRoute">
<!-- purge madis orphan files based on hour in separator constructor arg -->
<from uri="quartz://madis/orphan?cron=0+0+*+*+*+?" />
<bean ref="madisSeparator" method="fileCleaner" />
</route>
</camelContext>
</beans>

View file

@ -20,6 +20,7 @@
package com.raytheon.uf.edex.plugin.madis;
import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@ -51,6 +52,7 @@ import com.vividsolutions.jts.geom.Coordinate;
* Date Ticket# Engineer Description
* ----------- ---------- ----------- --------------------------
* 3/27/13 1746 dhladky Initial creation
* 6/17/13 2113 dhladky QPID memory usage alleviation
* </pre>
*
* @author dhladky
@ -108,11 +110,13 @@ public class MadisDecoder extends AbstractDecoder {
* @return
* @throws DecoderException
*/
public PluginDataObject[] decode(MadisIngestObject mio)
public PluginDataObject[] decode(String path)
throws DecoderException {
PluginDataObject[] retVal = new PluginDataObject[0];
// de-serialize the object from the path
long time = System.currentTimeMillis();
MadisIngestObject mio = MadisSeparator.getObject(path);
PluginDataObject[] retVal = new PluginDataObject[0];
if (mio != null) {
@ -143,6 +147,12 @@ public class MadisDecoder extends AbstractDecoder {
} catch (Exception e) {
statusHandler.handle(Priority.ERROR,
"Could not open MADIS Ingest Object!", e);
} finally {
// discard file
File file = new File(path);
if (file.exists()) {
file.delete();
}
}
if (!retList.isEmpty()) {

View file

@ -2,9 +2,14 @@ package com.raytheon.uf.edex.plugin.madis;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.regex.Pattern;
import com.raytheon.edex.exception.DecoderException;
@ -13,6 +18,7 @@ import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
@ -25,6 +31,7 @@ import com.raytheon.uf.edex.core.EdexException;
* Date Ticket# Engineer Description
* ----------- ---------- ----------- --------------------------
* 5/18/13 753 dhladky Initial creation
* 6/17/13 2113 dhladky QPID memory usage alleviation
* </pre>
*
* @author dhladky
@ -36,13 +43,21 @@ public class MadisSeparator {
private static final Pattern regex = Pattern.compile(",");
public static final String pathPrefix = EDEXUtil.EDEX_HOME + File.separatorChar + "data"
+ File.separatorChar + "madis" + File.separatorChar;
private static final String pathSuffix = ".madis";
private String madisRoute;
private int timeback;
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(MadisSeparator.class);
public MadisSeparator(String madisRoute) {
public MadisSeparator(String madisRoute, int timeback) {
this.madisRoute = madisRoute;
this.timeback = timeback;
}
public void separate(byte[] inputData)
@ -79,15 +94,13 @@ public class MadisSeparator {
} else {
// breaks data into chunks of 50,000 lines each
if (i % 50000 == 0) {
if (i != 0) {
// write to queue
sendObject(mio, madisRoute);
// write to queue and filesystem
sendFile(mio);
long time4 = System.currentTimeMillis();
statusHandler.handle(Priority.INFO,
"MADIS separated record wrote: " + j + " "
+ (time4 - time3) + " ms");
j++;
}
// reset everything
time3 = System.currentTimeMillis();
mio = new MadisIngestObject(headerType);
@ -99,7 +112,8 @@ public class MadisSeparator {
}
// flush the last record out
if (mio != null && !mio.getLines().isEmpty()) {
sendObject(mio, madisRoute);
// write to queue and filesystem
sendFile(mio);
long time4 = System.currentTimeMillis();
statusHandler.handle(Priority.INFO,
"MADIS separated record wrote: "+j+" "
@ -149,19 +163,128 @@ public class MadisSeparator {
}
/**
* Send the object
* Writes the object to the File System
* @param mio
*/
private static void sendObject(MadisIngestObject mio, String route) {
private static void sendObject(MadisIngestObject mio, String path) throws Exception {
FileOutputStream fos = null;
try {
File file = new File(path);
file.createNewFile();
fos = new FileOutputStream(file);
SerializationUtil.transformToThriftUsingStream(mio, fos);
} catch (FileNotFoundException e) {
statusHandler.handle(Priority.PROBLEM, "Couldn't create file", e);
throw new Exception("Unable to write File, FileNotFound!", e);
} catch (SerializationException e) {
statusHandler.handle(Priority.PROBLEM, "Serialization exception writing file", e);
throw new Exception("Unable to write File, Serialization!", e);
} catch (IOException e) {
statusHandler.handle(Priority.PROBLEM, "IO Exception creating file", e);
throw new Exception("Unable to write File, IO!", e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
statusHandler.handle(Priority.PROBLEM, "Problem closing the stream!", e);
}
}
}
}
/**
* Send the path to QPID
* @param path
* @param route
*/
private static void sendPath(String path, String route) throws Exception {
try {
byte[] bytes = SerializationUtil.transformToThrift(mio);
EDEXUtil.getMessageProducer().sendAsyncUri(
route, bytes);
route, path);
} catch (EdexException e) {
statusHandler.handle(Priority.PROBLEM,
e.getLocalizedMessage(), e);
throw new Exception("Unable to send Path message, EdexException!", e);
}
}
/**
* Get the file from the path
* @param path
* @param route
*/
public static MadisIngestObject getObject(String path) {
FileInputStream fis = null;
MadisIngestObject mio = null;
try {
fis = new FileInputStream(new File(path));
mio = SerializationUtil.transformFromThrift(MadisIngestObject.class, fis);
} catch (FileNotFoundException e) {
statusHandler.handle(Priority.PROBLEM,
"Couldn't find the file", e);
} catch (SerializationException e) {
statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage(), e);
statusHandler.handle(Priority.PROBLEM, "Couldn't de-serialize the file", e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
statusHandler.handle(Priority.PROBLEM,
"Problem closing the stream!", e);
}
}
}
return mio;
}
/**
* Gets the filePath and sends to queue and disk
* @param mio
*/
private void sendFile(MadisIngestObject mio) {
StringBuilder filePath = new StringBuilder();
filePath.append(pathPrefix).append(UUID.randomUUID().toString()).append(pathSuffix);
String path = filePath.toString();
try {
sendObject(mio, path);
sendPath(path, madisRoute);
} catch (Exception e) {
statusHandler.handle(Priority.ERROR,
"Could not write file or place message on queue!", e);
}
}
/**
* Cleans up any orphaned files that might be hanging around
*/
public void fileCleaner() {
statusHandler.handle(Priority.INFO, "Checking for orphaned files.");
// Checking for orphaned files in the madis drop directory
try {
long currentTimeMinusOne = System.currentTimeMillis()
- (timeback * (TimeUtil.MILLIS_PER_HOUR));
File madisDir = new File(pathPrefix);
if (madisDir.isDirectory()) {
File[] files = madisDir.listFiles();
for (File file : files) {
if (file.lastModified() < currentTimeMinusOne) {
file.delete();
statusHandler.handle(Priority.WARN,
"Deleting orphaned file! " + file.getName());
}
}
}
} catch (Exception e) {
statusHandler.handle(Priority.ERROR,
"Coudn't check for orphaned files." + e);
}
}