Merge "Issue #1804 Switch all radar decompressing to be in memory. Change-Id: I4b7ce2d0280687953c434d1d1b59edb8e147b221" into omaha_13.3.1

Former-commit-id: 510e56c67f [formerly d747686fa1] [formerly bcf676f5dd [formerly 855f05b2121c17c71d13495c9aacb12878757af6]]
Former-commit-id: bcf676f5dd
Former-commit-id: ac58a04170
This commit is contained in:
Richard Peter 2013-03-20 17:26:31 -05:00 committed by Gerrit Code Review
commit 855dd1a664
5 changed files with 57 additions and 185 deletions

View file

@ -19,11 +19,6 @@
**/
package com.raytheon.edex.plugin.radar;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.DataFormatException;
@ -45,6 +40,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* ------------ ---------- ----------- --------------------------
* Nov 11, 2010 mnash Initial creation
* Jul 16, 2012 DR 14723 D.Friedman Decompress files atomically
* Mar 20, 2013 1804 bsteffen Switch all radar decompressing to be in
* memory.
*
* </pre>
*
@ -66,6 +63,24 @@ public class RadarDecompressor {
.compile("([A-Z]{4}[0-9]{2} [A-Z]{4} [0-9]{6})\\x0D\\x0D\\x0A(\\w{6})\\x0D\\x0D\\x0A");
public byte[] decompress(byte[] messageData, Headers headers) {
return decompressImpl(messageData, headers, false);
}
public byte[] decompressWithHeader(byte[] messageData, Headers headers) {
return decompressImpl(messageData, headers, true);
}
/**
* decompress the radar data in messageData.
*
* @param messageData
* @param headers
* @param keepHeader
* If true, keep any WMO/AWIPS heading found in file
* @return
*/
public byte[] decompressImpl(byte[] messageData, Headers headers,
boolean keepHeader) {
byte[] radarData = null;
try {
int wmoHeaderSize;
@ -79,10 +94,23 @@ public class RadarDecompressor {
if (isCompressed(messageData, wmoHeaderSize)) {
radarData = decompressRadar(messageData, wmoHeaderSize, headers);
} else {
if (keepHeader) {
// put the header back on.
byte[] radarDataWithHeader = new byte[radarData.length
+ wmoHeaderSize];
System.arraycopy(messageData, 0, radarDataWithHeader, 0,
wmoHeaderSize);
System.arraycopy(radarData, 0, radarDataWithHeader,
wmoHeaderSize, radarData.length);
radarData = radarDataWithHeader;
}
} else if (!keepHeader && wmoHeaderSize > 0) {
// strip the header.
radarData = new byte[messageData.length - wmoHeaderSize];
System.arraycopy(messageData, wmoHeaderSize, radarData, 0,
radarData.length);
} else {
radarData = messageData;
}
} catch (Exception e) {
theHandler.handle(Priority.ERROR, "Failed decompression on "
@ -124,106 +152,6 @@ public class RadarDecompressor {
return false;
}
/**
* Decompress file atomically.
*
* @param file
* @param headers
* @param keepHeader If true, keep any WMO/AWIPS heading found in file
* @return
*/
private File decompressToFileImpl(File file, Headers headers, boolean keepHeader) {
byte[] messageData = null;
FileInputStream input = null;
try {
input = new FileInputStream(file);
int fileSize = (int) input.getChannel().size();
messageData = new byte[fileSize];
input.read(messageData);
} catch (FileNotFoundException e) {
theHandler.handle(Priority.ERROR, e.getMessage());
} catch (IOException e) {
theHandler.handle(Priority.ERROR, e.getMessage());
} finally {
if (input != null) {
try {
input.close();
} catch (IOException e) {
theHandler.handle(Priority.ERROR, e.getMessage());
}
}
}
/*
* TODO: If reading fails, the code below will NPE. Is this
* done intentionally to stop processing?
*/
String headerSearch = "";
int start = 0;
if (messageData.length < 80) {
} else {
// skip the WMO header if any
headerSearch = new String(messageData, 0, 80);
start = findStartRadarData(headerSearch);
headerSearch = headerSearch.substring(0, start);
}
messageData = decompress(messageData, headers);
FileOutputStream output = null;
File tmpFile = null;
try {
tmpFile = File.createTempFile(file.getName() + ".", ".decompress", file.getParentFile());
output = new FileOutputStream(tmpFile);
if (keepHeader)
output.write(headerSearch.getBytes());
output.write(messageData);
output.close();
output = null;
if (tmpFile.renameTo(file))
tmpFile = null;
else
theHandler.handle(Priority.ERROR,
String.format("Cannot rename %s to %s", tmpFile, file));
} catch (IOException e) {
theHandler.handle(Priority.ERROR, e.getMessage());
} finally {
if (output != null)
try {
output.close();
} catch (IOException e) {
theHandler.handle(Priority.ERROR, "error closing file", e);
}
if (tmpFile != null)
tmpFile.delete();
}
return file;
}
/**
* Used for things that need to write the data back out to a file
*
* @param messageData
* @return
*/
public File decompressToFile(File file, Headers headers) {
return decompressToFileImpl(file, headers, true);
}
/**
* Used for things that need to write the data back out to a file, without a
* header. Same as decompressToFile, but will strip the header off before
* writing it back out.
*
* @param messageData
* @return
*/
public File decompressToFileWithoutHeader(File file, Headers headers) {
return decompressToFileImpl(file, headers, false);
}
private int findStartRadarData(String headerInfo) {
int startOfRadarData = 0;
Matcher matcher = WMO_PATTERN.matcher(headerInfo);

View file

@ -44,7 +44,7 @@
<doTry>
<pipeline>
<bean ref="stringToFile" />
<bean ref="radarDecompressor" method="decompressToFile" />
<bean ref="radarDecompressor" method="decompressWithHeader" />
<bean ref="dpaDecodeSrv" method="process"/>
<!-- Uncomment when dpaDecodeSrv route properly handles only its own files
<bean ref="processUtil" method="log"/>

View file

@ -31,7 +31,7 @@
<pipeline>
<bean ref="setIngestHeaderFields"/>
<bean ref="stringToFile" />
<bean ref="radarDecompressor" method="decompressToFile" />
<bean ref="radarDecompressor" method="decompressWithHeader" />
<bean ref="dhrDecodeSrv" method="filter" />
</pipeline>
<doCatch>

View file

@ -20,10 +20,8 @@
package com.raytheon.uf.edex.ohd.pproc;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
@ -35,6 +33,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.raytheon.edex.esb.Headers;
import com.raytheon.uf.common.ohd.AppsDefaults;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.core.EdexException;
@ -50,6 +49,9 @@ import com.raytheon.uf.edex.ohd.MainMethod;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 14, 2008 bphillip Initial creation
* Mar 20, 2013 1804 bsteffen Switch all radar decompressing to be in
* memory.
*
* </pre>
*
* @author bphillip
@ -71,9 +73,10 @@ public class DecodeDpaSrv {
private CoreDao dao;
public Object process(File file) throws EdexException {
public Object process(byte[] message, Headers headers) throws EdexException {
boolean proc = false;
proc = checkFile(file);
File ingestFile = new File(headers.get("ingestfilename").toString());
proc = checkFile(message, ingestFile.getName());
if (proc == false) {
return null;
}
@ -110,30 +113,8 @@ public class DecodeDpaSrv {
* @throws EdexException
* If IOExceptions occur
*/
private boolean checkFile(File dpaFile) throws EdexException {
/*
* Read the contents of the file into memory.
*/
BufferedInputStream inStream = null;
try {
inStream = new BufferedInputStream(new FileInputStream(dpaFile));
} catch (FileNotFoundException e) {
throw new EdexException("Cannot find file: " + dpaFile, e);
}
byte[] fileContents = new byte[(int) dpaFile.length()];
try {
inStream.read(fileContents);
} catch (IOException e) {
throw new EdexException("Error reading file: " + dpaFile, e);
} finally {
try {
inStream.close();
} catch (IOException e1) {
throw new EdexException("Error closing stream to file: "
+ dpaFile, e1);
}
}
private boolean checkFile(byte[] fileContents, String fileName)
throws EdexException {
/*
* Copy off the first few bytes to see if leading bytes are present
@ -174,8 +155,7 @@ public class DecodeDpaSrv {
if (offset != 0) {
BufferedOutputStream outStream = null;
try {
outFile = new File(FileUtil.join(outPath,
dpaFile.getName()));
outFile = new File(FileUtil.join(outPath, fileName));
outStream = new BufferedOutputStream(
new FileOutputStream(outFile));
} catch (FileNotFoundException e) {
@ -187,7 +167,7 @@ public class DecodeDpaSrv {
outStream.write(fileContents, offset,
fileContents.length - offset);
logger.info("Re-writing contents of file: "
+ dpaFile + " to " + outFile);
+ fileName + " to " + outFile);
} catch (IOException e) {
throw new EdexException(
"Error writing updated contents of DPA file: "
@ -209,18 +189,18 @@ public class DecodeDpaSrv {
return false;
}
} else {
String radarid = dpaFile.getName().substring(1, 4).toUpperCase();
String radarid = fileName.substring(1, 4).toUpperCase();
String query = String
.format("select * from radarloc where radid='%s' and use_radar='T' ",
radarid);
dao = new CoreDao(DaoConfig.forDatabase("ihfs"));
Object[] rs = dao.executeSQLQuery(query);
if (rs.length > 0) {
outFile = new File(FileUtil.join(outPath, dpaFile.getName()));
outFile = new File(FileUtil.join(outPath, fileName));
logger.info("No header found for file: " + outFile
+ " decoding with filename.");
try {
FileUtil.copyFile(dpaFile, outFile);
FileUtil.bytes2File(fileContents, outFile);
} catch (IOException e) {
throw new EdexException(
"Error copying file to destination directory: "

View file

@ -20,16 +20,15 @@
package com.raytheon.uf.edex.ohd.pproc;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.raytheon.edex.esb.Headers;
import com.raytheon.uf.common.ohd.AppsDefaults;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
@ -53,6 +52,9 @@ import com.raytheon.uf.edex.ohd.MainMethod;
* Jan 20, 2010 4200 snaples Initial creation
* Mar 09, 2012 417 dgilling Refactor to use two-stage queue
* process.
* Mar 20, 2013 1804 bsteffen Switch all radar decompressing to be in
* memory.
*
* </pre>
*
* @author snaples
@ -174,23 +176,15 @@ public class HPEDhrSrv {
* @param hpeFile
* The radar file to check.
*/
public void filter(File hpeFile) {
public void filter(byte[] fileContents, Headers headers) {
// logger.info("Starting HPE Check message.");
byte[] fileContents = new byte[0];
try {
fileContents = readHpeFile(hpeFile);
} catch (FileNotFoundException e) {
logger.handle(Priority.PROBLEM,
"HPE Cannot find file: " + hpeFile.toString(), e);
} catch (IOException e) {
logger.handle(Priority.PROBLEM, "HPE Error reading file: "
+ hpeFile.toString(), e);
}
if (fileContents.length < 80) {
return;
}
File hpeFile = new File(headers.get("ingestfilename").toString());
// check header
String fileStartStr = new String(fileContents, 0, 80);
// array will hold radar id, dtype, and dt information. using array so
@ -218,36 +212,6 @@ public class HPEDhrSrv {
// logger.info("Finished HPE CheckFile. ");
}
/**
* Reads the given radar file to memory for later processing by the
* <code>filter</code> function.
*
* @param hpeFile
* The file to read.
* @return The contents of the file.
* @throws FileNotFoundException
* If the specified file does not exist or cannot be opened.
* @throws IOException
* If an I/O error occurs while reading the file.
*/
private byte[] readHpeFile(File hpeFile) throws FileNotFoundException,
IOException {
BufferedInputStream inStream = null;
byte[] fileContents = null;
try {
inStream = new BufferedInputStream(new FileInputStream(hpeFile));
fileContents = new byte[(int) hpeFile.length()];
inStream.read(fileContents);
} finally {
if (inStream != null) {
inStream.close();
}
}
return fileContents;
}
/**
* Takes the given parameters and constructs a <code>HPEDhrMessage</code> to
* be placed onto the queue used by <code>HPEDhrSrv</code> for actual data