Merge "Omaha #4868 - Add multiple subgrids, make grib thread and memory dynamic by number of cores." into omaha_16.2.1

Former-commit-id: 2a4cc93ab1d7a31e3181f026a013aa8fd46945e4
This commit is contained in:
Nate Jensen 2015-09-24 11:38:56 -05:00 committed by Gerrit Code Review
commit 9927049254
15 changed files with 3526 additions and 3246 deletions

View file

@ -18,8 +18,28 @@
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
numCores=`grep -c ^processor /proc/cpuinfo`
let "GRIB_DECODE_THREADS=numCores / 2"
if [ $GRIB_DECODE_THREADS -gt 12 ]; then
GRIB_DECODE_THREADS=12
elif [ $GRIB_DECODE_THREADS -lt 4 ]; then
GRIB_DECODE_THREADS=4
fi
# sets bounds based on # of threads available
let "MAX_MEM = GRIB_DECODE_THREADS * 128" # in Meg
let "GRIB_MAX_GRID_POINTS = GRIB_DECODE_THREADS * 2000000"
let "GRIB_PERSIST_THREADS = GRIB_DECODE_THREADS / 2"
let "GRIB_MAX_PERSIST_MEMORY_IN_MB = GRIB_PERSIST_THREADS * 50"
export INIT_MEM=128 # in Meg
export MAX_MEM=1024 # in Meg
export MAX_MEM
export GRIB_MAX_GRID_POINTS
export GRIB_PERSIST_THREADS
export GRIB_MAX_PERSIST_MEMORY_IN_MB
export GRIB_DECODE_THREADS
export METADATA_POOL_MAX=10
export EDEX_DEBUG_PORT=5007

View file

@ -6,8 +6,8 @@
<bean id="gribDecoder" class="com.raytheon.edex.plugin.grib.GribDecoder" />
<bean id="gribGridPointLock" class="com.raytheon.edex.plugin.grib.GribGridPointLock">
<constructor-arg value="${grib-decode.count.gridpoints}"/>
<constructor-arg value="${grib-decode.count.threads}"/>
<constructor-arg value="${GRIB_MAX_GRID_POINTS}"/>
<constructor-arg value="${GRIB_DECODE_THREADS}"/>
</bean>
<bean id="gribSplitter" class="com.raytheon.edex.plugin.grib.GribSplitter" />
@ -31,8 +31,8 @@
<bean id="gribPersister" class="com.raytheon.edex.plugin.grib.GribPersister">
<constructor-arg value="grid"/>
<constructor-arg value="${grib-persister.count.threads}"/>
<constructor-arg value="${grib-persister.count.mb}"/>
<constructor-arg value="${GRIB_PERSIST_THREADS}"/>
<constructor-arg value="${GRIB_MAX_PERSIST_MEMORY_IN_MB}"/>
</bean>
<bean factory-bean="contextManager" factory-method="registerContextStateProcessor">
@ -43,8 +43,8 @@
<camelContext id="grib-decode" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler">
<endpoint id="gribSplitJmsEndpoint" uri="jms-durable:queue:Ingest.GribSplit?concurrentConsumers=${grib-split.count.threads}"/>
<endpoint id="gribDecodeJmsEndpoint" uri="jms-durable:queue:Ingest.GribDecode?concurrentConsumers=${grib-decode.count.threads}"/>
<endpoint id="gribSplitJmsEndpoint" uri="jms-durable:queue:Ingest.GribSplit?concurrentConsumers=${GRIB_SPLIT_THREADS}"/>
<endpoint id="gribDecodeJmsEndpoint" uri="jms-durable:queue:Ingest.GribDecode?concurrentConsumers=${GRIB_DECODE_THREADS}"/>
<!-- Begin Grib Decode Route -->
<route id="gribSplitIngestRoute">

View file

@ -1,17 +1,23 @@
# the number of grib split threads.
grib-split.count.threads=2
# Override the below settings in a local site file to fine
# tune the grib decoder. Default values are dynamic based
# on the number of cores in the system. They are defined
# in /awips2/edex/etc/ingestGrib.sh
# the number of grib split threads, this is the only value
# that is not dynamically updated based on core count
GRIB_SPLIT_THREADS=2
# the number of grib decode threads.
grib-decode.count.threads=4
#GRIB_DECODE_THREADS=4
# the number of grib persist threads.
grib-persister.count.threads=4
#GRIB_PERSIST_THREADS=2
# Maximum number of grid points to decode at one time for all threads. Large
# grib files may cause the decoder to reach this limit and then some threads
# will have to wait. This can be used to control the amount of memory used by
# the decoder.
grib-decode.count.gridpoints=12000000
#GRIB_MAX_GRID_POINTS=8000000
# Maximum number of grids in MB that are allowed to be in memory waiting to be persisted.
grib-persister.count.mb=200
#grib-persister.count.mb=100

View file

@ -69,6 +69,8 @@ public class GribPersister implements IContextStateProcessor {
private final Map<Thread, String> inProcessFiles = new HashMap<>(4, 1);
private final Object waitLock = new Object();
private volatile boolean running = true;
public IHDFFilePathProvider pathProvider;
@ -83,38 +85,24 @@ public class GribPersister implements IContextStateProcessor {
private final GribPersistThread[] persistThreads;
public GribPersister(String pluginName, String numThreads,
String maxGridsInMb) {
public GribPersister(String pluginName, int numThreads, int maxGridsInMb) {
pathProvider = PluginFactory.getInstance().getPathProvider(pluginName);
int numPersistThreads = 0;
try {
numPersistThreads = Integer.parseInt(numThreads);
} catch (NumberFormatException e) {
// ignore
}
if (numPersistThreads <= 0) {
numPersistThreads = 4;
if (numThreads <= 0) {
statusHandler.warn("Invalid numThreads [" + numThreads
+ "], using default of [" + numPersistThreads + "]");
+ "], using default of [" + 4 + "]");
numThreads = 4;
}
int maxInMb = 0;
try {
maxInMb = Integer.parseInt(maxGridsInMb);
} catch (NumberFormatException e) {
// ignore
}
if (maxInMb <= 0) {
maxInMb = 100;
if (maxGridsInMb <= 0) {
statusHandler.warn("Invalid maxGridInMb [" + maxGridsInMb
+ "], using default of [" + maxInMb + "]");
+ "], using default of [" + 100 + "]");
maxGridsInMb = 100;
}
maxBytesInMemory = maxInMb * 1024l * 1024l;
maxBytesInMemory = maxGridsInMb * 1024L * 1024L;
persistThreads = new GribPersistThread[numPersistThreads];
persistThreads = new GribPersistThread[numThreads];
for (int i = 0; i < persistThreads.length; i++) {
persistThreads[i] = new GribPersistThread();
persistThreads[i].setName("GribPersist-" + (i + 1));
@ -153,65 +141,74 @@ public class GribPersister implements IContextStateProcessor {
private boolean addPendingRecords(@Headers Map<String, Object> headers,
GridRecord[] records) {
if (records != null) {
StringBuilder path = new StringBuilder();
StringBuilder pathBuilder = new StringBuilder();
String[] paths = new String[records.length];
long bytesForRecords = 0;
for (int i = 0; i < records.length; i++) {
pathBuilder.setLength(0);
GridRecord record = records[i];
String plugin = record.getPluginName();
pathBuilder.append(pathProvider.getHDFPath(plugin, record))
.append(File.separatorChar)
.append(pathProvider.getHDFFileName(plugin, record));
paths[i] = pathBuilder.toString();
// set processing time to this point
Long dequeueTime = (Long) headers.get("dequeueTime");
if (dequeueTime != null) {
long processingTime = System.currentTimeMillis()
- dequeueTime;
headers.put("processingTime", processingTime);
}
/*
* since grids will be bulk stored by file, track the original
* headers for purposes of logging and stats
*/
record.addExtraAttribute(HEADERS_ATTRIBUTE, headers);
bytesForRecords += ((float[]) record.getMessageData()).length * 4;
}
synchronized (gridsByFile) {
if (!running) {
return false;
}
for (GridRecord record : records) {
String plugin = record.getPluginName();
path.setLength(0);
path.append(pathProvider.getHDFPath(plugin, record))
.append(File.separatorChar)
.append(pathProvider.getHDFFileName(plugin, record));
String filePath = path.toString();
List<GridRecord> recs = gridsByFile.get(filePath);
for (int i = 0; i < records.length; i++) {
String path = paths[i];
List<GridRecord> recs = gridsByFile.get(path);
if (recs == null) {
recs = new LinkedList<>();
gridsByFile.put(filePath, recs);
gridsByFile.put(path, recs);
}
recs.add(record);
// set processing time to this point
Long dequeueTime = (Long) headers.get("dequeueTime");
if (dequeueTime != null) {
long processingTime = System.currentTimeMillis()
- dequeueTime;
headers.put("processingTime", processingTime);
}
/*
* since grids will be bulk stored by file, track the
* original headers for purposes of logging and stats
*/
record.addExtraAttribute(HEADERS_ATTRIBUTE, headers);
// update bytesInMemory
bytesInMemory += ((float[]) record.getMessageData()).length * 4;
recs.add(records[i]);
}
// wake up a sleeping persist thread
gridsByFile.notify();
gridsPending += records.length;
// wake up any sleeping persist threads
gridsByFile.notifyAll();
boolean logMessage = true;
// update bytesInMemory
bytesInMemory += bytesForRecords;
if (bytesInMemory > maxBytesInMemory) {
statusHandler.info("Max Grids in memory for "
+ getClass().getSimpleName()
+ " exceeded. Waiting for grids to process.");
while (bytesInMemory > maxBytesInMemory) {
if (logMessage) {
statusHandler.info("Max Grids in memory for "
+ getClass().getName()
+ " exceeded. Waiting for grids to process");
logMessage = false;
synchronized (waitLock) {
try {
waitLock.wait();
} catch (InterruptedException e) {
// ignore
}
}
try {
gridsByFile.wait();
} catch (InterruptedException e) {
// ignore
}
statusHandler
.info("Max Grid lock released. Resuming processing.");
}
}
@ -278,7 +275,7 @@ public class GribPersister implements IContextStateProcessor {
private class GribPersistThread extends Thread {
@Override
public void run() {
String logMsg = "Processed %d grid%s to %s in %s. %d grid%s pending, %d grid%s in process on other threads, %s in memory";
String logMsg = "Processed %d grid(s) to %s in %s. %d grid(s) pending, %d grid(s) in process on other threads, %s in memory";
String file = null;
List<GridRecord> recordsToStore = null;
long timeToStore = System.currentTimeMillis();
@ -375,20 +372,26 @@ public class GribPersister implements IContextStateProcessor {
synchronized (gridsByFile) {
inProcessFiles.remove(this);
bytesInMemory -= bytesFree;
bytesUsedByGrids = bytesInMemory;
gridsInProcess -= numRecords;
gridsStoringOnOtherThreads = gridsInProcess;
gridsLeft = gridsPending;
gridsByFile.notifyAll();
long oldBytes = bytesInMemory;
bytesInMemory -= bytesFree;
bytesUsedByGrids = bytesInMemory;
if ((oldBytes > maxBytesInMemory)
&& (bytesInMemory < maxBytesInMemory)) {
// wake any pending decode threads
synchronized (waitLock) {
waitLock.notifyAll();
}
}
}
statusHandler.info(String.format(logMsg, numRecords,
(numRecords == 1 ? "" : "s"), file,
TimeUtil.prettyDuration(timeToStore),
gridsLeft, (gridsLeft == 1 ? "" : "s"),
gridsStoringOnOtherThreads,
(gridsStoringOnOtherThreads == 1 ? "" : "s"),
file, TimeUtil.prettyDuration(timeToStore),
gridsLeft, gridsStoringOnOtherThreads,
SizeUtil.prettyByteSize(bytesUsedByGrids)));
}
}
@ -411,11 +414,12 @@ public class GribPersister implements IContextStateProcessor {
running = false;
synchronized (gridsByFile) {
gridsByFile.notifyAll();
if (gridsByFile.size() > 0) {
statusHandler.info("Waiting for " + gridsByFile.size()
+ " hdf5 files to be persisted");
}
gridsByFile.notifyAll();
}
for (GribPersistThread thread : persistThreads) {

View file

@ -0,0 +1,30 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
This_software_product_contains_export-restricted_data_whose
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
an_export_license_or_other_authorization.
Contractor_Name:________Raytheon_Company
Contractor_Address:_____6825_Pine_Street,_Suite_340
________________________Mail_Stop_B8
________________________Omaha,_NE_68106
________________________402.291.0100
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
further_licensing_information.
-->
<subGridDef>
<modelNames>AK-RTMA3</modelNames>
<referenceGrid>1023</referenceGrid>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>36.5</centerLatitude>
<centerLongitude>-81</centerLongitude>
-->
</subGridDef>

View file

@ -0,0 +1,30 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
This_software_product_contains_export-restricted_data_whose
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
an_export_license_or_other_authorization.
Contractor_Name:________Raytheon_Company
Contractor_Address:_____6825_Pine_Street,_Suite_340
________________________Mail_Stop_B8
________________________Omaha,_NE_68106
________________________402.291.0100
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
further_licensing_information.
-->
<subGridDef>
<modelNames>EKDMOS</modelNames>
<referenceGrid>184</referenceGrid>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>36.5</centerLatitude>
<centerLongitude>-81</centerLongitude>
-->
</subGridDef>

View file

@ -0,0 +1,30 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
This_software_product_contains_export-restricted_data_whose
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
an_export_license_or_other_authorization.
Contractor_Name:________Raytheon_Company
Contractor_Address:_____6825_Pine_Street,_Suite_340
________________________Mail_Stop_B8
________________________Omaha,_NE_68106
________________________402.291.0100
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
further_licensing_information.
-->
<subGridDef>
<modelNames>EKDMOS-AK</modelNames>
<referenceGrid>1023</referenceGrid>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>36.5</centerLatitude>
<centerLongitude>-81</centerLongitude>
-->
</subGridDef>

View file

@ -1,9 +1,28 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
This_software_product_contains_export-restricted_data_whose
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
an_export_license_or_other_authorization.
Contractor_Name:________Raytheon_Company
Contractor_Address:_____6825_Pine_Street,_Suite_340
________________________Mail_Stop_B8
________________________Omaha,_NE_68106
________________________402.291.0100
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
further_licensing_information.
-->
<subGridDef>
<modelNames>ESTOFS</modelNames>
<modelNames>ESTOFS estofsEP estofsUS</modelNames>
<referenceGrid>184</referenceGrid>
<nx>800</nx>
<ny>1000</ny>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>36.5</centerLatitude>
<centerLongitude>-81</centerLongitude>

View file

@ -0,0 +1,30 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
This_software_product_contains_export-restricted_data_whose
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
an_export_license_or_other_authorization.
Contractor_Name:________Raytheon_Company
Contractor_Address:_____6825_Pine_Street,_Suite_340
________________________Mail_Stop_B8
________________________Omaha,_NE_68106
________________________402.291.0100
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
further_licensing_information.
-->
<subGridDef>
<modelNames>ETSS</modelNames>
<referenceGrid>184</referenceGrid>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>36.5</centerLatitude>
<centerLongitude>-81</centerLongitude>
-->
</subGridDef>

View file

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
This_software_product_contains_export-restricted_data_whose
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
an_export_license_or_other_authorization.
Contractor_Name:________Raytheon_Company
Contractor_Address:_____6825_Pine_Street,_Suite_340
________________________Mail_Stop_B8
________________________Omaha,_NE_68106
________________________402.291.0100
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
further_licensing_information.
-->
<subGridDef>
<modelNames>HPCGuide</modelNames>
<referenceGrid>184</referenceGrid>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>46.0</centerLatitude>
<centerLongitude>-95.5</centerLongitude>
-->
</subGridDef>

View file

@ -0,0 +1,30 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
This_software_product_contains_export-restricted_data_whose
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
an_export_license_or_other_authorization.
Contractor_Name:________Raytheon_Company
Contractor_Address:_____6825_Pine_Street,_Suite_340
________________________Mail_Stop_B8
________________________Omaha,_NE_68106
________________________402.291.0100
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
further_licensing_information.
-->
<subGridDef>
<modelNames>NamDNG25</modelNames>
<referenceGrid>184</referenceGrid>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>36.5</centerLatitude>
<centerLongitude>-81</centerLongitude>
-->
</subGridDef>

View file

@ -0,0 +1,30 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
This_software_was_developed_and_/_or_modified_by_Raytheon_Company,
pursuant_to_Contract_DG133W-05-CQ-1067_with_the_US_Government.
U.S._EXPORT_CONTROLLED_TECHNICAL_DATA
This_software_product_contains_export-restricted_data_whose
export/transfer/disclosure_is_restricted_by_U.S._law._Dissemination
to_non-U.S._persons_whether_in_the_United_States_or_abroad_requires
an_export_license_or_other_authorization.
Contractor_Name:________Raytheon_Company
Contractor_Address:_____6825_Pine_Street,_Suite_340
________________________Mail_Stop_B8
________________________Omaha,_NE_68106
________________________402.291.0100
See_the_AWIPS_II_Master_Rights_File_("Master_Rights_File.pdf")_for
further_licensing_information.
-->
<subGridDef>
<modelNames>RTGSSTHR</modelNames>
<referenceGrid>173</referenceGrid>
<nx>250</nx>
<ny>250</ny>
<!--
<centerLatitude>36.5</centerLatitude>
<centerLongitude>-81</centerLongitude>
-->
</subGridDef>

View file

@ -21,8 +21,8 @@
<subGridDef>
<modelNames>MOSGuide</modelNames>
<referenceGrid>184</referenceGrid>
<nx>700</nx>
<ny>700</ny>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>46.0</centerLatitude>
<centerLongitude>-95.5</centerLongitude>

View file

@ -21,8 +21,8 @@
<subGridDef>
<modelNames>MOSGuide-AK</modelNames>
<referenceGrid>1023</referenceGrid>
<nx>1649</nx>
<ny>1105</ny>
<nx>500</nx>
<ny>500</ny>
<!--
<centerLatitude>46.0</centerLatitude>
<centerLongitude>-95.5</centerLongitude>