Merge "Issue #1804 Switch all radar decompressing to be in memory. Change-Id: I4b7ce2d0280687953c434d1d1b59edb8e147b221" into omaha_13.3.1
Former-commit-id:d747686fa1
[formerly 855f05b2121c17c71d13495c9aacb12878757af6] Former-commit-id:bcf676f5dd
This commit is contained in:
commit
ac58a04170
5 changed files with 57 additions and 185 deletions
|
@ -19,11 +19,6 @@
|
|||
**/
|
||||
package com.raytheon.edex.plugin.radar;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.zip.DataFormatException;
|
||||
|
@ -45,6 +40,8 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 11, 2010 mnash Initial creation
|
||||
* Jul 16, 2012 DR 14723 D.Friedman Decompress files atomically
|
||||
* Mar 20, 2013 1804 bsteffen Switch all radar decompressing to be in
|
||||
* memory.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -66,6 +63,24 @@ public class RadarDecompressor {
|
|||
.compile("([A-Z]{4}[0-9]{2} [A-Z]{4} [0-9]{6})\\x0D\\x0D\\x0A(\\w{6})\\x0D\\x0D\\x0A");
|
||||
|
||||
public byte[] decompress(byte[] messageData, Headers headers) {
|
||||
return decompressImpl(messageData, headers, false);
|
||||
}
|
||||
|
||||
public byte[] decompressWithHeader(byte[] messageData, Headers headers) {
|
||||
return decompressImpl(messageData, headers, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* decompress the radar data in messageData.
|
||||
*
|
||||
* @param messageData
|
||||
* @param headers
|
||||
* @param keepHeader
|
||||
* If true, keep any WMO/AWIPS heading found in file
|
||||
* @return
|
||||
*/
|
||||
public byte[] decompressImpl(byte[] messageData, Headers headers,
|
||||
boolean keepHeader) {
|
||||
byte[] radarData = null;
|
||||
try {
|
||||
int wmoHeaderSize;
|
||||
|
@ -79,10 +94,23 @@ public class RadarDecompressor {
|
|||
|
||||
if (isCompressed(messageData, wmoHeaderSize)) {
|
||||
radarData = decompressRadar(messageData, wmoHeaderSize, headers);
|
||||
} else {
|
||||
if (keepHeader) {
|
||||
// put the header back on.
|
||||
byte[] radarDataWithHeader = new byte[radarData.length
|
||||
+ wmoHeaderSize];
|
||||
System.arraycopy(messageData, 0, radarDataWithHeader, 0,
|
||||
wmoHeaderSize);
|
||||
System.arraycopy(radarData, 0, radarDataWithHeader,
|
||||
wmoHeaderSize, radarData.length);
|
||||
radarData = radarDataWithHeader;
|
||||
}
|
||||
} else if (!keepHeader && wmoHeaderSize > 0) {
|
||||
// strip the header.
|
||||
radarData = new byte[messageData.length - wmoHeaderSize];
|
||||
System.arraycopy(messageData, wmoHeaderSize, radarData, 0,
|
||||
radarData.length);
|
||||
} else {
|
||||
radarData = messageData;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
theHandler.handle(Priority.ERROR, "Failed decompression on "
|
||||
|
@ -124,106 +152,6 @@ public class RadarDecompressor {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decompress file atomically.
|
||||
*
|
||||
* @param file
|
||||
* @param headers
|
||||
* @param keepHeader If true, keep any WMO/AWIPS heading found in file
|
||||
* @return
|
||||
*/
|
||||
private File decompressToFileImpl(File file, Headers headers, boolean keepHeader) {
|
||||
byte[] messageData = null;
|
||||
FileInputStream input = null;
|
||||
|
||||
try {
|
||||
input = new FileInputStream(file);
|
||||
int fileSize = (int) input.getChannel().size();
|
||||
messageData = new byte[fileSize];
|
||||
input.read(messageData);
|
||||
} catch (FileNotFoundException e) {
|
||||
theHandler.handle(Priority.ERROR, e.getMessage());
|
||||
} catch (IOException e) {
|
||||
theHandler.handle(Priority.ERROR, e.getMessage());
|
||||
} finally {
|
||||
if (input != null) {
|
||||
try {
|
||||
input.close();
|
||||
} catch (IOException e) {
|
||||
theHandler.handle(Priority.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: If reading fails, the code below will NPE. Is this
|
||||
* done intentionally to stop processing?
|
||||
*/
|
||||
|
||||
String headerSearch = "";
|
||||
int start = 0;
|
||||
if (messageData.length < 80) {
|
||||
} else {
|
||||
// skip the WMO header if any
|
||||
headerSearch = new String(messageData, 0, 80);
|
||||
start = findStartRadarData(headerSearch);
|
||||
headerSearch = headerSearch.substring(0, start);
|
||||
}
|
||||
|
||||
messageData = decompress(messageData, headers);
|
||||
|
||||
FileOutputStream output = null;
|
||||
File tmpFile = null;
|
||||
try {
|
||||
tmpFile = File.createTempFile(file.getName() + ".", ".decompress", file.getParentFile());
|
||||
output = new FileOutputStream(tmpFile);
|
||||
if (keepHeader)
|
||||
output.write(headerSearch.getBytes());
|
||||
output.write(messageData);
|
||||
output.close();
|
||||
output = null;
|
||||
if (tmpFile.renameTo(file))
|
||||
tmpFile = null;
|
||||
else
|
||||
theHandler.handle(Priority.ERROR,
|
||||
String.format("Cannot rename %s to %s", tmpFile, file));
|
||||
} catch (IOException e) {
|
||||
theHandler.handle(Priority.ERROR, e.getMessage());
|
||||
} finally {
|
||||
if (output != null)
|
||||
try {
|
||||
output.close();
|
||||
} catch (IOException e) {
|
||||
theHandler.handle(Priority.ERROR, "error closing file", e);
|
||||
}
|
||||
if (tmpFile != null)
|
||||
tmpFile.delete();
|
||||
}
|
||||
return file;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for things that need to write the data back out to a file
|
||||
*
|
||||
* @param messageData
|
||||
* @return
|
||||
*/
|
||||
public File decompressToFile(File file, Headers headers) {
|
||||
return decompressToFileImpl(file, headers, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for things that need to write the data back out to a file, without a
|
||||
* header. Same as decompressToFile, but will strip the header off before
|
||||
* writing it back out.
|
||||
*
|
||||
* @param messageData
|
||||
* @return
|
||||
*/
|
||||
public File decompressToFileWithoutHeader(File file, Headers headers) {
|
||||
return decompressToFileImpl(file, headers, false);
|
||||
}
|
||||
|
||||
private int findStartRadarData(String headerInfo) {
|
||||
int startOfRadarData = 0;
|
||||
Matcher matcher = WMO_PATTERN.matcher(headerInfo);
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
<doTry>
|
||||
<pipeline>
|
||||
<bean ref="stringToFile" />
|
||||
<bean ref="radarDecompressor" method="decompressToFile" />
|
||||
<bean ref="radarDecompressor" method="decompressWithHeader" />
|
||||
<bean ref="dpaDecodeSrv" method="process"/>
|
||||
<!-- Uncomment when dpaDecodeSrv route properly handles only its own files
|
||||
<bean ref="processUtil" method="log"/>
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
<pipeline>
|
||||
<bean ref="setIngestHeaderFields"/>
|
||||
<bean ref="stringToFile" />
|
||||
<bean ref="radarDecompressor" method="decompressToFile" />
|
||||
<bean ref="radarDecompressor" method="decompressWithHeader" />
|
||||
<bean ref="dhrDecodeSrv" method="filter" />
|
||||
</pipeline>
|
||||
<doCatch>
|
||||
|
|
|
@ -20,10 +20,8 @@
|
|||
|
||||
package com.raytheon.uf.edex.ohd.pproc;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -35,6 +33,7 @@ import java.util.regex.Pattern;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.raytheon.edex.esb.Headers;
|
||||
import com.raytheon.uf.common.ohd.AppsDefaults;
|
||||
import com.raytheon.uf.common.util.FileUtil;
|
||||
import com.raytheon.uf.edex.core.EdexException;
|
||||
|
@ -50,6 +49,9 @@ import com.raytheon.uf.edex.ohd.MainMethod;
|
|||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Nov 14, 2008 bphillip Initial creation
|
||||
* Mar 20, 2013 1804 bsteffen Switch all radar decompressing to be in
|
||||
* memory.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -71,9 +73,10 @@ public class DecodeDpaSrv {
|
|||
|
||||
private CoreDao dao;
|
||||
|
||||
public Object process(File file) throws EdexException {
|
||||
public Object process(byte[] message, Headers headers) throws EdexException {
|
||||
boolean proc = false;
|
||||
proc = checkFile(file);
|
||||
File ingestFile = new File(headers.get("ingestfilename").toString());
|
||||
proc = checkFile(message, ingestFile.getName());
|
||||
if (proc == false) {
|
||||
return null;
|
||||
}
|
||||
|
@ -110,30 +113,8 @@ public class DecodeDpaSrv {
|
|||
* @throws EdexException
|
||||
* If IOExceptions occur
|
||||
*/
|
||||
private boolean checkFile(File dpaFile) throws EdexException {
|
||||
|
||||
/*
|
||||
* Read the contents of the file into memory.
|
||||
*/
|
||||
BufferedInputStream inStream = null;
|
||||
try {
|
||||
inStream = new BufferedInputStream(new FileInputStream(dpaFile));
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new EdexException("Cannot find file: " + dpaFile, e);
|
||||
}
|
||||
byte[] fileContents = new byte[(int) dpaFile.length()];
|
||||
try {
|
||||
inStream.read(fileContents);
|
||||
} catch (IOException e) {
|
||||
throw new EdexException("Error reading file: " + dpaFile, e);
|
||||
} finally {
|
||||
try {
|
||||
inStream.close();
|
||||
} catch (IOException e1) {
|
||||
throw new EdexException("Error closing stream to file: "
|
||||
+ dpaFile, e1);
|
||||
}
|
||||
}
|
||||
private boolean checkFile(byte[] fileContents, String fileName)
|
||||
throws EdexException {
|
||||
|
||||
/*
|
||||
* Copy off the first few bytes to see if leading bytes are present
|
||||
|
@ -174,8 +155,7 @@ public class DecodeDpaSrv {
|
|||
if (offset != 0) {
|
||||
BufferedOutputStream outStream = null;
|
||||
try {
|
||||
outFile = new File(FileUtil.join(outPath,
|
||||
dpaFile.getName()));
|
||||
outFile = new File(FileUtil.join(outPath, fileName));
|
||||
outStream = new BufferedOutputStream(
|
||||
new FileOutputStream(outFile));
|
||||
} catch (FileNotFoundException e) {
|
||||
|
@ -187,7 +167,7 @@ public class DecodeDpaSrv {
|
|||
outStream.write(fileContents, offset,
|
||||
fileContents.length - offset);
|
||||
logger.info("Re-writing contents of file: "
|
||||
+ dpaFile + " to " + outFile);
|
||||
+ fileName + " to " + outFile);
|
||||
} catch (IOException e) {
|
||||
throw new EdexException(
|
||||
"Error writing updated contents of DPA file: "
|
||||
|
@ -209,18 +189,18 @@ public class DecodeDpaSrv {
|
|||
return false;
|
||||
}
|
||||
} else {
|
||||
String radarid = dpaFile.getName().substring(1, 4).toUpperCase();
|
||||
String radarid = fileName.substring(1, 4).toUpperCase();
|
||||
String query = String
|
||||
.format("select * from radarloc where radid='%s' and use_radar='T' ",
|
||||
radarid);
|
||||
dao = new CoreDao(DaoConfig.forDatabase("ihfs"));
|
||||
Object[] rs = dao.executeSQLQuery(query);
|
||||
if (rs.length > 0) {
|
||||
outFile = new File(FileUtil.join(outPath, dpaFile.getName()));
|
||||
outFile = new File(FileUtil.join(outPath, fileName));
|
||||
logger.info("No header found for file: " + outFile
|
||||
+ " decoding with filename.");
|
||||
try {
|
||||
FileUtil.copyFile(dpaFile, outFile);
|
||||
FileUtil.bytes2File(fileContents, outFile);
|
||||
} catch (IOException e) {
|
||||
throw new EdexException(
|
||||
"Error copying file to destination directory: "
|
||||
|
|
|
@ -20,16 +20,15 @@
|
|||
|
||||
package com.raytheon.uf.edex.ohd.pproc;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import com.raytheon.edex.esb.Headers;
|
||||
import com.raytheon.uf.common.ohd.AppsDefaults;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
|
@ -53,6 +52,9 @@ import com.raytheon.uf.edex.ohd.MainMethod;
|
|||
* Jan 20, 2010 4200 snaples Initial creation
|
||||
* Mar 09, 2012 417 dgilling Refactor to use two-stage queue
|
||||
* process.
|
||||
* Mar 20, 2013 1804 bsteffen Switch all radar decompressing to be in
|
||||
* memory.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author snaples
|
||||
|
@ -174,23 +176,15 @@ public class HPEDhrSrv {
|
|||
* @param hpeFile
|
||||
* The radar file to check.
|
||||
*/
|
||||
public void filter(File hpeFile) {
|
||||
public void filter(byte[] fileContents, Headers headers) {
|
||||
// logger.info("Starting HPE Check message.");
|
||||
byte[] fileContents = new byte[0];
|
||||
try {
|
||||
fileContents = readHpeFile(hpeFile);
|
||||
} catch (FileNotFoundException e) {
|
||||
logger.handle(Priority.PROBLEM,
|
||||
"HPE Cannot find file: " + hpeFile.toString(), e);
|
||||
} catch (IOException e) {
|
||||
logger.handle(Priority.PROBLEM, "HPE Error reading file: "
|
||||
+ hpeFile.toString(), e);
|
||||
}
|
||||
|
||||
if (fileContents.length < 80) {
|
||||
return;
|
||||
}
|
||||
|
||||
File hpeFile = new File(headers.get("ingestfilename").toString());
|
||||
|
||||
// check header
|
||||
String fileStartStr = new String(fileContents, 0, 80);
|
||||
// array will hold radar id, dtype, and dt information. using array so
|
||||
|
@ -218,36 +212,6 @@ public class HPEDhrSrv {
|
|||
// logger.info("Finished HPE CheckFile. ");
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the given radar file to memory for later processing by the
|
||||
* <code>filter</code> function.
|
||||
*
|
||||
* @param hpeFile
|
||||
* The file to read.
|
||||
* @return The contents of the file.
|
||||
* @throws FileNotFoundException
|
||||
* If the specified file does not exist or cannot be opened.
|
||||
* @throws IOException
|
||||
* If an I/O error occurs while reading the file.
|
||||
*/
|
||||
private byte[] readHpeFile(File hpeFile) throws FileNotFoundException,
|
||||
IOException {
|
||||
BufferedInputStream inStream = null;
|
||||
byte[] fileContents = null;
|
||||
|
||||
try {
|
||||
inStream = new BufferedInputStream(new FileInputStream(hpeFile));
|
||||
fileContents = new byte[(int) hpeFile.length()];
|
||||
inStream.read(fileContents);
|
||||
} finally {
|
||||
if (inStream != null) {
|
||||
inStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
return fileContents;
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the given parameters and constructs a <code>HPEDhrMessage</code> to
|
||||
* be placed onto the queue used by <code>HPEDhrSrv</code> for actual data
|
||||
|
|
Loading…
Add table
Reference in a new issue