Merge branch 'master_14.1.1' into omaha_14.1.1

Former-commit-id: 70c79bb7e6 [formerly d1b2cdd566] [formerly 65ebfc1a18] [formerly c0bb3a1102 [formerly 65ebfc1a18 [formerly c333fe23736110bcb273aafd90ee6551ff7e921d]]]
Former-commit-id: c0bb3a1102
Former-commit-id: 44a31523c0eab129c3c048ad551394d20361765b [formerly e8c6af3060]
Former-commit-id: eff32cfbf3
This commit is contained in:
Steve Harris 2013-11-19 15:45:06 -06:00
commit 2ed2e93b03
24 changed files with 1418 additions and 1279 deletions

View file

@ -24,6 +24,10 @@ import java.util.Calendar;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.Job;
import org.eclipse.jface.dialogs.MessageDialog;
import org.eclipse.swt.SWT;
import org.eclipse.swt.events.SelectionAdapter;
@ -71,6 +75,7 @@ import com.raytheon.viz.ui.dialogs.CaveSWTDialog;
* Jul 24, 2013 2220 rferrel Changes to queue size request for all data.
* Aug 01, 2013 2221 rferrel Changes for select configuration.
* Aug 06, 2013 2222 rferrel Changes to display all selected data.
* Nov 14, 2013 2549 rferrel Get category data moved off the UI thread.
* </pre>
*
* @author bgonzale
@ -452,19 +457,49 @@ public abstract class AbstractArchiveDlg extends CaveSWTDialog implements
* adjust sizes on the display table.
*/
protected void populateTableComp() {
String archiveName = getSelectedArchiveName();
String categoryName = getSelectedCategoryName();
final String archiveName = getSelectedArchiveName();
final String categoryName = getSelectedCategoryName();
setCursorBusy(true);
try {
setShowingSelected(false);
tableComp.populateTable(archiveName, categoryName,
new ArrayList<DisplayData>(0));
tableComp.refresh();
List<DisplayData> displayDatas = sizeJob.changeDisplay(archiveName,
categoryName);
Job job = new Job("populate category table") {
@Override
protected IStatus run(IProgressMonitor monitor) {
getCategoryTableData(archiveName, categoryName);
return Status.OK_STATUS;
}
};
job.schedule();
}
/**
* This gets the desired categories data. Assumed called from non-UI thread
* since it is possible getting the data may take time which would hang up
* the UI thread.
*
* @param archiveName
* @param categoryName
*/
private void getCategoryTableData(final String archiveName,
final String categoryName) {
final List<DisplayData> displayDatas = sizeJob.changeDisplay(
archiveName, categoryName);
VizApp.runAsync(new Runnable() {
@Override
public void run() {
try {
if (displayDatas != null) {
tableComp
.populateTable(archiveName, categoryName, displayDatas);
tableComp.populateTable(archiveName, categoryName,
displayDatas);
} else {
tableComp.refresh();
}
@ -472,6 +507,8 @@ public abstract class AbstractArchiveDlg extends CaveSWTDialog implements
setCursorBusy(false);
}
}
});
}
/**
* Set the shells cursor to the desire state.

View file

@ -312,9 +312,24 @@ public class CurrentWarnings {
public AbstractWarningRecord getNewestByTracking(String etn, String phensig) {
AbstractWarningRecord rval = null;
synchronized (officeId) {
List<AbstractWarningRecord> warnings = warningMap.get(toKey(
List<AbstractWarningRecord> keyWarnings = warningMap.get(toKey(
phensig, etn));
if (warnings != null) {
if (keyWarnings != null) {
// filter out "future" warnings.
List<AbstractWarningRecord> warnings = null;
if (SimulatedTime.getSystemTime().isRealTime()) {
warnings = keyWarnings;
} else {
warnings = new ArrayList<AbstractWarningRecord>(
keyWarnings.size());
long currentTime = TimeUtil.newCalendar().getTimeInMillis();
for (AbstractWarningRecord warning : keyWarnings) {
if (warning.getIssueTime().getTimeInMillis() <= currentTime) {
warnings.add(warning);
}
}
}
// See if we have a NEW warning
for (AbstractWarningRecord warning : warnings) {
if (getAction(warning.getAct()) == WarningAction.NEW) {
@ -399,8 +414,7 @@ public class CurrentWarnings {
if (warnings != null) {
Calendar c = TimeUtil.newCalendar();
c.add(Calendar.MINUTE, -10);
TimeRange t = new TimeRange(c.getTime(), SimulatedTime
.getSystemTime().getTime());
TimeRange t = new TimeRange(c.getTime(), TimeUtil.newDate());
for (AbstractWarningRecord warning : warnings) {
if (t.contains(warning.getIssueTime().getTime())) {
@ -438,8 +452,7 @@ public class CurrentWarnings {
ArrayList<AbstractWarningRecord> conProds = new ArrayList<AbstractWarningRecord>();
Calendar c = TimeUtil.newCalendar();
c.add(Calendar.MINUTE, -10);
TimeRange t = new TimeRange(c.getTime(), SimulatedTime
.getSystemTime().getTime());
TimeRange t = new TimeRange(c.getTime(), TimeUtil.newDate());
for (AbstractWarningRecord warning : warnings) {
WarningAction action = getAction(warning.getAct());
if (t.contains(warning.getIssueTime().getTime())
@ -545,12 +558,20 @@ public class CurrentWarnings {
List<AbstractWarningRecord> records = new ArrayList<AbstractWarningRecord>(
recordsMap.values());
// Sort by insert time
// Sort by issue time when null fall back to insert time.
Collections.sort(records, new Comparator<AbstractWarningRecord>() {
@Override
public int compare(AbstractWarningRecord o1,
AbstractWarningRecord o2) {
return o1.getInsertTime().compareTo(o2.getInsertTime());
Calendar c1 = o1.getIssueTime();
if (c1 == null) {
c1 = o1.getInsertTime();
}
Calendar c2 = o2.getIssueTime();
if (c2 == null) {
c2 = o2.getInsertTime();
}
return c1.compareTo(c2);
}
});
@ -602,7 +623,10 @@ public class CurrentWarnings {
Map<String, List<AbstractWarningRecord>> recordMap = new HashMap<String, List<AbstractWarningRecord>>();
for (AbstractWarningRecord rec : newRecords) {
List<AbstractWarningRecord> recs = recordMap.get(rec.getOfficeid());
// This used the key rec.getOfficeid() which can be null; which
// can drop alerts when more then one new Record.
// Changed to use the same key as the put.
List<AbstractWarningRecord> recs = recordMap.get(rec.getXxxid());
if (recs == null) {
recs = new ArrayList<AbstractWarningRecord>();
recordMap.put(rec.getXxxid(), recs);

View file

@ -26,6 +26,7 @@ import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.time.DataTime;
import com.raytheon.uf.common.time.SimulatedTime;
import com.raytheon.uf.common.time.TimeRange;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.viz.core.DrawableString;
import com.raytheon.uf.viz.core.IGraphicsTarget;
import com.raytheon.uf.viz.core.IGraphicsTarget.HorizontalAlignment;
@ -82,6 +83,7 @@ import com.vividsolutions.jts.geom.prep.PreparedGeometryFactory;
* Check if geometry is null when inspecting.
* Jul 22, 2013 2176 jsanchez Updated the wire frame and text for EMERGENCY warnings.
* Sep 4, 2013 2176 jsanchez Made the polygon line width thicker and made regular text not bold.
* Nov 11, 2013 2439 rferrel Changes to prevent getting future warning when in DRT mode.
* </pre>
*
* @author jsanchez
@ -128,7 +130,7 @@ public abstract class AbstractWWAResource extends
protected static PreparedGeometryFactory pgf = new PreparedGeometryFactory();
/** one hour ahead, entirely arbitrary/magic **/
private static final long LAST_FRAME_ADJ = (60 * 60 * 1000);
private static final long LAST_FRAME_ADJ = TimeUtil.MILLIS_PER_HOUR;
protected String resourceName;
@ -465,13 +467,20 @@ public abstract class AbstractWWAResource extends
if (lastFrame) {
// use current system time to determine what to display
Date timeToDisplay = SimulatedTime.getSystemTime().getTime();
Date timeToDisplay = TimeUtil.newDate();
// change frame time
frameTime = timeToDisplay;
// point paint time to different time
paintTime = new DataTime(timeToDisplay);
// point framePeriod to new frame
if (SimulatedTime.getSystemTime().isRealTime()) {
framePeriod = new TimeRange(frameTime, LAST_FRAME_ADJ);
} else {
// Prevent getting "future" records by keeping interval in the
// same minute.
framePeriod = new TimeRange(frameTime,
30 * TimeUtil.MILLIS_PER_SECOND);
}
}
// check if the warning is cancelled

View file

@ -147,4 +147,4 @@ if [ $DEBUG_FLAG == "on" ]; then
echo "To Debug ... Connect to Port: ${EDEX_DEBUG_PORT}."
fi
java -jar ${EDEX_HOME}/bin/yajsw/wrapper.jar -c ${EDEX_HOME}/conf/${CONF_FILE} ${WRAPPER_ARGS}
java -Xmx32m -XX:MaxPermSize=12m -XX:ReservedCodeCacheSize=4m -jar ${EDEX_HOME}/bin/yajsw/wrapper.jar -c ${EDEX_HOME}/conf/${CONF_FILE} ${WRAPPER_ARGS}

View file

@ -10,25 +10,31 @@
<bean id="uriAggregator" class="com.raytheon.uf.edex.esb.camel.DataUriAggregator" />
<bean id="toDataURI" class="com.raytheon.uf.edex.esb.camel.ToDataURI" />
<bean id="persist" class="com.raytheon.edex.services.PersistSrv" factory-method="getInstance"/>
<bean id="index" class="com.raytheon.edex.services.IndexSrv"/>
<bean id="dupElim" class="com.raytheon.edex.ingestsrv.DupElimSrv"/>
<bean id="persist" class="com.raytheon.edex.services.PersistSrv" factory-method="getInstance" />
<bean id="index" class="com.raytheon.edex.services.IndexSrv" />
<bean id="persistCamelRegistered" factory-bean="contextManager"
factory-method="register">
<constructor-arg ref="persist-camel"/>
<bean id="persistCamelRegistered" factory-bean="contextManager" factory-method="register">
<constructor-arg ref="persist-camel" />
</bean>
<camelContext id="persist-camel" xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
<camelContext id="persist-camel" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler">
<!-- Generic persist and indexing
Intended for routes that need persisting to HDF5,
Indexing but no alert processing
-->
Indexing but no alert processing -->
<route id="persistIndex">
<from uri="direct-vm:persistIndex"/>
<bean ref="persist" method="persist"/>
<bean ref="index" method="index"/>
<bean ref="processUtil" method="log"/>
<from uri="direct-vm:persistIndex" />
<doTry>
<bean ref="persist" method="persist" />
<bean ref="index" method="index" />
<bean ref="processUtil" method="log" />
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:persist?level=ERROR" />
</doCatch>
</doTry>
</route>
<!-- Generic persist, index and alert route
@ -36,38 +42,40 @@
Indexing and Alerting
-->
<route id="persistIndexAlert">
<from uri="direct-vm:persistIndexAlert"/>
<bean ref="persist" method="persist"/>
<bean ref="index" method="index"/>
<bean ref="processUtil" method="log"/>
<bean ref="toDataURI" method="toDataURI"/>
<to uri="vm:stageNotification"/>
<from uri="direct-vm:persistIndexAlert" />
<doTry>
<bean ref="persist" method="persist" />
<bean ref="index" method="index" />
<bean ref="processUtil" method="log" />
<bean ref="toDataURI" method="toDataURI" />
<to uri="vm:stageNotification" />
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:persist?level=ERROR" />
</doCatch>
</doTry>
</route>
<!-- Generic index and alert route
Intended for routes that need Indexing and Alerting
-->
<route id="indexAlert">
<from uri="direct-vm:indexAlert"/>
<bean ref="index" method="index"/>
<bean ref="processUtil" method="log"/>
<bean ref="toDataURI" method="toDataURI"/>
<to uri="vm:stageNotification"/>
<from uri="direct-vm:indexAlert" />
<doTry>
<bean ref="index" method="index" />
<bean ref="processUtil" method="log" />
<bean ref="toDataURI" method="toDataURI" />
<to uri="vm:stageNotification" />
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:persist?level=ERROR" />
</doCatch>
</doTry>
</route>
<route id="notificationAggregation">
<from uri="vm:stageNotification"/>
<from uri="vm:stageNotification" />
<bean ref="uriAggregator" method="addDataUris" />
<!--
<multicast>
<pipeline>
<bean ref="uriAggregator" method="addDataUris" />
</pipeline>
<pipeline>
<to uri="jms-generic:queue:subscriptions" />
</pipeline>
</multicast>
-->
</route>
<route id="notificationTimer">
@ -76,8 +84,8 @@
<method bean="uriAggregator" method="hasUris" />
<bean ref="uriAggregator" method="sendQueuedUris" />
<bean ref="serializationUtil" method="transformToThrift" />
<to uri="jms-generic:topic:edex.alerts?timeToLive=60000"/>
<to uri="jms-generic:topic:edex.alerts?timeToLive=60000" />
</filter>
</route>
</camelContext>
</beans>
</beans>

View file

@ -0,0 +1,131 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.ingestsrv;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.annotations.DataURIUtil;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.PerformanceStatus;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.CollectionUtil;
import com.raytheon.uf.edex.database.plugin.PluginDao;
import com.raytheon.uf.edex.database.plugin.PluginFactory;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
/**
* Checks database for duplicates of data. Does not account for clustering.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Nov 11, 2013 2478 rjpeter Initial creation
* </pre>
*
* @author rjpeter
* @version 1.0
*/
public class DupElimSrv {
private static final IUFStatusHandler statusHandler = UFStatus
.getHandler(DupElimSrv.class);
private final IPerformanceStatusHandler perfLog = PerformanceStatus
.getHandler("DupElim:");
/**
* Checks the passed pdos against database for existence. If duplicates
* found returns a new array containing only the new plugin data objects. If
* an errors occurs the original pdos array will be returned.
*
* @param pluginName
* @param pdos
* @return
*/
public PluginDataObject[] dupElim(PluginDataObject[] pdos) {
if ((pdos == null) || (pdos.length == 0)) {
return new PluginDataObject[0];
}
ITimer dupCheckTimer = TimeUtil.getTimer();
dupCheckTimer.start();
int numBefore = pdos.length;
String pluginName = pdos[0].getPluginName();
try {
PluginDao dao = PluginFactory.getInstance()
.getPluginDao(pluginName);
List<PluginDataObject> newPdos = new ArrayList<PluginDataObject>(
pdos.length);
// TODO: Bulk querying, groups of 100 using IN lists?
for (PluginDataObject pdo : pdos) {
DatabaseQuery dbQuery = new DatabaseQuery(pdo.getClass());
Map<String, Object> dataUriFields = DataURIUtil
.createDataURIMap(pdo);
for (Map.Entry<String, Object> field : dataUriFields.entrySet()) {
String fieldName = field.getKey();
// ignore pluginName
if (!DataURIUtil.PLUGIN_NAME_KEY.equals(fieldName)) {
dbQuery.addQueryParam(field.getKey(), field.getValue());
}
}
@SuppressWarnings("unchecked")
List<PluginDataObject> dbPdos = (List<PluginDataObject>) dao
.queryByCriteria(dbQuery);
if (CollectionUtil.isNullOrEmpty(dbPdos)) {
newPdos.add(pdo);
} else {
// shouldn't be more than 1
PluginDataObject dbPdo = dbPdos.get(1);
if ((dbPdo == null)
|| !pdo.getDataURI().equals(dbPdo.getDataURI())) {
newPdos.add(pdo);
}
}
}
if (pdos.length != newPdos.size()) {
pdos = newPdos.toArray(new PluginDataObject[newPdos.size()]);
}
} catch (Exception e) {
statusHandler
.error("Error occurred during duplicate elimination processing",
e);
}
dupCheckTimer.stop();
perfLog.logDuration(pluginName + ": Eliminated "
+ (numBefore - pdos.length) + " of " + numBefore
+ " record(s): Time to process", dupCheckTimer.getElapsedTime());
return pdos;
}
}

View file

@ -96,6 +96,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
* 08/08/13 DR16485 ryu Remove call to getDatabaseId() from getMaxInsertTimeByDbId()
* so new GFE databases aren't accidentally created.
* 09/30/2013 #2147 rferrel Changes to archive hdf5 files.
* 11/13/2013 #2517 randerso Added ORDER BY clause to getOverlappingTimes
* </pre>
*
* @author bphillip
@ -814,7 +815,8 @@ public class GFEDao extends DefaultPluginDao {
.find("SELECT dataTime.validPeriod"
+ " FROM GFERecord WHERE parmId = ?"
+ " AND dataTime.validPeriod.start < ?"
+ " AND dataTime.validPeriod.end > ?",
+ " AND dataTime.validPeriod.end > ?"
+ " ORDER BY dataTime.validPeriod.start",
new Object[] { parmId, tr.getEnd(),
tr.getStart() });
return rval;

View file

@ -83,6 +83,10 @@ import com.raytheon.uf.common.util.Pair;
* Scalar/VectorGridSlices, refactor
* Discrete/WeatherGridSlices builders.
* Jun 05, 2013 #2063 dgilling Port history() from A1.
* Nov 11, 2013 #2517 randerso Changed put() to support multiple discontiguous saves
* Added getKeys(tr) to get grid times overlapping a time range
* Removed caching of inventory as it was not being updated when
* grids were updated/deleted
*
* </pre>
*
@ -103,8 +107,6 @@ public class IFPWE {
private final GridParmInfo gpi;
private List<TimeRange> availableTimes;
private final WsId wsId;
/**
@ -126,23 +128,41 @@ public class IFPWE {
}
/**
* Returns the available times of data for the parm
* Returns all available times of data for the parm
*
* @return
* @return the time ranges of all available data for the parm
*/
public List<TimeRange> getKeys() {
if (availableTimes == null) {
availableTimes = new ArrayList<TimeRange>();
List<TimeRange> times = GridParmManager.getGridInventory(parmId)
.getPayload();
if (times != null) {
Collections.sort(times);
availableTimes.addAll(times);
}
List<TimeRange> availableTimes;
ServerResponse<List<TimeRange>> sr = GridParmManager
.getGridInventory(parmId);
if (sr.isOkay()) {
availableTimes = sr.getPayload();
} else {
availableTimes = Collections.emptyList();
}
return availableTimes;
}
/**
* Returns available times of data for the parm that overlap a time range
*
* @param tr
* the desired time range
* @return the time ranges of data that overlap the desired time range
*/
public List<TimeRange> getKeys(TimeRange tr) {
List<TimeRange> overlappingTimes;
ServerResponse<List<TimeRange>> sr = GridParmManager.getGridInventory(
parmId, tr);
if (sr.isOkay()) {
overlappingTimes = sr.getPayload();
} else {
overlappingTimes = Collections.emptyList();
}
return overlappingTimes;
}
/**
* Returns the grid parm info
*
@ -245,38 +265,41 @@ public class IFPWE {
* storage.
*
* @param inventory
* A Map of TimeRanges to IGridSlices to be saved. Time is the
* slice's valid time.
* @param timeRangeSpan
* The replacement time range of grids to be saved. Must cover
* each individual TimeRange in inventory.
* A Map of TimeRanges to List of IGridSlices. TimeRange is the
* replacement time range
* @throws GfeException
* If an error occurs while trying to obtain a lock on the
* destination database.
*/
public void put(LinkedHashMap<TimeRange, IGridSlice> inventory,
TimeRange timeRangeSpan) throws GfeException {
public void put(LinkedHashMap<TimeRange, List<IGridSlice>> inventory)
throws GfeException {
for (Entry<TimeRange, List<IGridSlice>> entry : inventory.entrySet()) {
TimeRange timeRangeSpan = entry.getKey();
statusHandler.debug("Getting lock for ParmID: " + parmId + " TR: "
+ timeRangeSpan);
ServerResponse<List<LockTable>> lockResponse = LockManager
.getInstance().requestLockChange(
new LockRequest(parmId, timeRangeSpan, LockMode.LOCK),
wsId, siteId);
new LockRequest(parmId, timeRangeSpan,
LockMode.LOCK), wsId, siteId);
if (lockResponse.isOkay()) {
statusHandler.debug("LOCKING: Lock granted for: " + wsId
+ " for time range: " + timeRangeSpan);
} else {
statusHandler.error("Could not lock TimeRange " + timeRangeSpan
+ " for parm [" + parmId + "]: " + lockResponse.message());
+ " for parm [" + parmId + "]: "
+ lockResponse.message());
throw new GfeException("Request lock failed. "
+ lockResponse.message());
}
List<GFERecord> records = new ArrayList<GFERecord>(inventory.size());
for (Entry<TimeRange, IGridSlice> entry : inventory.entrySet()) {
GFERecord rec = new GFERecord(parmId, entry.getKey());
rec.setGridHistory(entry.getValue().getHistory());
rec.setMessageData(entry.getValue());
List<IGridSlice> gridSlices = entry.getValue();
List<GFERecord> records = new ArrayList<GFERecord>(
gridSlices.size());
for (IGridSlice slice : gridSlices) {
GFERecord rec = new GFERecord(parmId, slice.getValidTime());
rec.setGridHistory(slice.getHistory());
rec.setMessageData(slice);
records.add(rec);
}
SaveGridRequest sgr = new SaveGridRequest(parmId, timeRangeSpan,
@ -288,9 +311,9 @@ public class IFPWE {
if (sr.isOkay()) {
SendNotifications.send(sr.getNotifications());
} else {
statusHandler.error("Unable to save grids for parm [" + parmId
+ "] over time range " + timeRangeSpan + ": "
+ sr.message());
statusHandler.error("Unable to save grids for parm ["
+ parmId + "] over time range " + timeRangeSpan
+ ": " + sr.message());
}
} finally {
ServerResponse<List<LockTable>> unLockResponse = LockManager
@ -298,8 +321,8 @@ public class IFPWE {
new LockRequest(parmId, timeRangeSpan,
LockMode.UNLOCK), wsId, siteId);
if (unLockResponse.isOkay()) {
statusHandler.debug("LOCKING: Unlocked for: " + wsId + " TR: "
+ timeRangeSpan);
statusHandler.debug("LOCKING: Unlocked for: " + wsId
+ " TR: " + timeRangeSpan);
} else {
statusHandler.error("Could not unlock TimeRange "
+ timeRangeSpan + " for parm [" + parmId + "]: "
@ -309,6 +332,7 @@ public class IFPWE {
}
}
}
}
/**
* Returns the grid history for a specified time range.

View file

@ -19,9 +19,8 @@
##
import os, stat, time, string, bisect, getopt, sys, traceback
import LogStream, iscTime, iscUtil, mergeGrid
#import pupynere as NetCDF
import os, stat, time, string, sys
import iscTime, iscUtil, mergeGrid
try:
# dev environment
from Scientific.IO import NetCDF
@ -82,70 +81,287 @@ from com.raytheon.uf.edex.database.cluster import ClusterTask
# 05/23/13 1759 dgilling Remove unnecessary imports.
# 06/05/13 2063 dgilling Change __siteInDbGrid() to
# call IFPWE.history() like A1.
# 11/05/13 2517 randerso Restructured logging so it coulde be used by WECache
# Changed WECache to limit the number of cached grids kept in memory
#
#
BATCH_WRITE_COUNT = 20
BATCH_DELAY = 0.0
ISC_USER="isc"
MAX_CACHE_BYTES = 64 * 1024 * 1024 # 64 MB
ISC_USER = "isc"
logger = None
## Logging methods ##
def initLogger(logName):
import logging
global logger
logger = iscUtil.getLogger("iscMosaic", logName=logName, logLevel=logging.INFO)
def printTR(tr):
"""
Format time range for printing (yymmdd_hhmm,yymmdd_hhmm)
Works with list or tuple
Args:
tr: the time range to format
Returns:
the formatted time range string
"""
if tr is not None:
format = "%Y%m%d_%H%M"
s = '(' + time.strftime(format, time.gmtime(tr[0])) + ',' + \
time.strftime(format, time.gmtime(tr[1])) + ')'
return s
else:
return "None"
def printShortTR(tr):
"""
Format time range for printing (dd/hh,dd/hh)
Works with list or tuple
Args:
tr: the time range to format
Returns:
the formatted time range string
"""
if tr is not None:
format = "%d/%H"
s = '(' + time.strftime(format, time.gmtime(tr[0])) + '->' + \
time.strftime(format, time.gmtime(tr[1])) + ')'
return s
else:
return "None"
class WECache(object):
"""
Cache representing the grids for a weather element that overlap a time range.
The cache will keep a limited number of grids in memory. This limit is determined
at initialization to be the number of grids that will fit in MAX_CACHE_BYTES (or a minimum of 2).
This is not a general purpose cache. It's behavior is designed to match the access patterns of iscMosaic
"""
def __init__(self, we, tr=None):
self._we = we
self._inv = {}
self._invCache = None
logger.debug("WECache creating: %s", str(self._we.getParmid().toString()))
javaInv = self._we.getKeys()
gridType = self._we.getGridType()
if gridType == "SCALAR":
bytesPerCell = 4
elif gridType == "VECTOR":
bytesPerCell = 8
elif gridType == "WEATHER":
bytesPerCell = 1
elif gridType == "DISCRETE":
bytesPerCell = 1
gloc = self._we.getGpi().getGridLoc()
gridBytes = gloc.getNx().intValue() * gloc.getNy().intValue() * bytesPerCell
self._maxCacheSize = max(2, MAX_CACHE_BYTES / gridBytes)
self._batchSize = self._maxCacheSize / 2
logger.debug("WECache max grids: %d, batch grids: %d", self._maxCacheSize, self._batchSize)
self._inv = {} # Map containing keys for all inventory
self._invCache = None # Cache of inventory sorted by start time
self._dirty = set() # Cache written to but not flushed to disk
self._populated = set() # Grid is currently in the cache
self._loaded = set() # Grid has been loaded into cache at least once
# get only keys that overlap tr
javaInv = self._we.getKeys(iscUtil.toJavaTimeRange(tr))
pyInv = []
for i in xrange(javaInv.size()):
pyInv.append(iscUtil.transformTime(javaInv.get(i)))
# Dont get grids outside of the passed in timerange.
if tr:
tokill = []
for i, t in enumerate(pyInv):
if not self.overlaps(tr, t):
tokill.append(i)
tokill.reverse()
for i in tokill:
del pyInv[i]
# create unpopulated entries for the entire inventory
for invTr in pyInv:
self._inv[invTr] = None
lst = list(pyInv)
while len(lst):
i = lst[:BATCH_WRITE_COUNT]
javaTRs = ArrayList()
for tr in i:
# populate first BATCH_READCOUNT grids
if len(pyInv) > self._batchSize:
trList = pyInv[:self._batchSize - 1]
# add on the last tr since it is used by __splitGridsOnProcBoundary
trList.append(pyInv[-1])
else:
trList = pyInv
self.__loadGrids(trList)
def __loadGrids(self, trList):
javaTRs = ArrayList(len(trList))
for tr in trList:
javaTRs.add(iscUtil.toJavaTimeRange(tr))
gridsAndHist = self._we.get(javaTRs, True)
for idx, tr in enumerate(i):
for idx in range(gridsAndHist.size()):
pair = gridsAndHist.get(idx)
tr = iscUtil.transformTime(pair.getFirst().getValidTime())
if tr in self._loaded:
logger.debug("WECache reloading: %s", printTR(tr))
else:
logger.debug("WECache loading: %s", printTR(tr))
g = self.__encodeGridSlice(pair.getFirst())
h = self.__encodeGridHistory(pair.getSecond())
self._inv[tr] = (g, h)
lst = lst[BATCH_WRITE_COUNT:]
time.sleep(BATCH_DELAY)
self._populated.add(tr)
self._loaded.add(tr)
def keys(self):
if not self._invCache:
self._invCache = tuple(sorted(self._inv.keys(), key=lambda t: t[0]))
return self._invCache
def __getitem__(self, key):
def __getitem__(self, tr):
logger.debug("WECache getting: %s", printTR(tr))
if tr in self._populated or tr in self._dirty:
return self._inv[tr]
if self._inv.has_key(tr):
self.__handleCacheMiss(tr)
return self._inv[tr]
else:
return None
def __handleCacheMiss(self, tr):
"""
This function is called when a time range is requested that is not currently in the cache.
It will load the next batch of unpopulated grids in time range order starting with the time range passed in.
If the cache does not have room for a batch of grids to be loaded without exceeding the max cache size
the earliest dirty grids (or clean if not enough dirty grids are found) are flushed to disk before reading
the next dash.
Args:
tr: the missing time range
"""
logger.debug("WECache miss: %s", printTR(tr))
# determine next batch of keys to read
toRead = self.keys()
toRead = toRead[toRead.index(tr):]
toRead = sorted(set(toRead) - self._populated, key=lambda t: t[0])
toRead = toRead[:self._batchSize]
# if not room to read another batch
if len(self._populated) + self._batchSize > self._maxCacheSize:
toFlush = []
# first flush dirty grids
toFlush += self._populated & self._dirty
# then flush clean grids if necessary
toFlush += self._populated - self._dirty
# flush only enough to read a batch
toFlush = sorted(toFlush, key=lambda t: t[0])
toFlush = toFlush[:self._maxCacheSize - self._batchSize]
self.__flushGrids(toFlush)
self.__loadGrids(toRead)
def __flushGrids(self, trList):
"""
Flush a list time ranges from the cache.
Dirty time ranges will be written to disk.
Writes will be done in _batchSize groups
Args:
trList: the list of time ranges to be flushed
"""
logger.debug("WECache flushing: %d grids", len(trList))
saveRequest = LinkedHashMap()
saveList = [] # python time ranges covered by this saveRequest
saveSize = 0 # number of grids in saveRequest
timeSpan = None # time span if this contiguous batch
gridsToSave = ArrayList(self._batchSize) # grids in this contiguous batch
saveBatch = False
for tr in sorted(trList, key=lambda t: t[0]):
dirty = tr in self._dirty
if dirty:
logger.debug("WECache storing: %s", printTR(tr))
saveList.append(tr)
pyGrid, pyHist = self._inv[tr]
if pyGrid is not None:
javaGrid = self.__buildJavaGridSlice(tr, pyGrid, pyHist)
gridsToSave.add(javaGrid)
if timeSpan is None:
timeSpan = [tr[0], tr[1]]
else:
timeSpan[1] = tr [1]
saveBatch = gridsToSave.size() >= self._batchSize
else: # clean grid
# save contiguous dirty blocks
saveBatch = timeSpan is not None
# if populated and clean just purge from cache
if tr in self._populated:
logger.debug("WECache purging: %s", printTR(tr))
self._inv[tr] = None
self._populated.remove(tr)
if saveBatch:
# add this contiguous batch to saveRequest
logger.debug("WECache saving %d grids in %s", gridsToSave.size(), printTR(timeSpan))
gridSaveTR = iscUtil.toJavaTimeRange(timeSpan)
saveRequest.put(gridSaveTR, gridsToSave)
timeSpan = None
saveBatch = False
saveSize += gridsToSave.size()
gridsToSave = ArrayList(self._batchSize)
# if this saveRequest has reached the batchSize save it
if saveSize >= self._batchSize:
try:
return self._inv[key]
except KeyError:
grid = self._we.getItem(iscUtil.toJavaTimeRange(key))
pyGrid = self.__encodeGridSlice(grid)
history = grid.getGridDataHistory()
pyHist = self.__encodeGridHistory(history)
return (pyGrid, pyHist)
self._we.put(saveRequest)
except:
raise
else: # no exceptions on save, clear saved grids from cache
# depopulate save grids
for tr in saveList:
self._inv[tr] = None
self._populated.discard(tr)
self._dirty.remove(tr)
saveRequest.clear()
saveList = []
saveSize = 0
# save partial batch if necessary
if len(saveList):
if timeSpan is not None:
logger.debug("WECache saving %d grids in %s", gridsToSave.size(), printTR(timeSpan))
gridSaveTR = iscUtil.toJavaTimeRange(timeSpan)
saveRequest.put(gridSaveTR, gridsToSave)
try:
self._we.put(saveRequest)
except:
raise
else: # no exceptions on save, clear saved grids from cache
# depopulate save grids
for tr in saveList:
self._inv[tr] = None
self._populated.discard(tr)
self._dirty.remove(tr)
return
def __setitem__(self, tr, value):
if value is None:
logger.debug("WECache erasing: %s", printTR(tr))
grid = hist = None
else:
logger.debug("WECache setting: %s", printTR(tr))
grid, hist = value
# Remove any overlapping grids
@ -155,46 +371,37 @@ class WECache(object):
tokill.append(itr)
for i in tokill:
del self._inv[i]
self._dirty.discard(i)
self._populated.discard(i)
self._loaded.discard(i)
self._invCache = None
# Now add the new grid if it exists
if grid is not None:
# if cache full flush some grids to disk
if len(self._populated) >= self._maxCacheSize:
toFlush = []
# first flush dirty grids
toFlush += self._populated & self._dirty
# then flush clean grids if necessary
toFlush += self._populated - self._dirty
# flush a full batch is possible
toFlush = sorted(toFlush, key=lambda t: t[0])
toFlush = toFlush[:self._batchSize]
self.__flushGrids(toFlush)
# Now add the new grid
self._inv[tr] = (grid, hist)
self._dirty.add(tr)
self._loaded.add(tr)
self._invCache = None
if grid is not None:
self._populated.add(tr)
def flush(self):
"""Actually writes the contents of the WECache to HDF5/DB"""
# get cache inventory in time range order
# we want to write to disk in contiguous time range blocks so we only
# overwrite what we have full sets of grids for.
inv = list(self.keys())
# Don't believe the grid slices need to be in time order when saving
# but leaving them that way just in case.
gridsToSave = LinkedHashMap()
while inv:
# retrieve the next BATCH of grids to persist
i = inv[:BATCH_WRITE_COUNT]
# pre-compute the replacement TR for the save requests generated by
# IFPWE.put().
# since the inventory is in order it's the start time of the
# first TR and the end time of the last TR.
gridSaveTR = iscUtil.toJavaTimeRange((i[0][0], i[-1][1]))
for tr in i:
javaTR = iscUtil.toJavaTimeRange(tr)
pyGrid, pyHist = self._inv[tr]
javaHist = self.__buildJavaGridHistory(pyHist)
javaGrid = self.__buildJavaGridSlice(javaTR, pyGrid, javaHist)
gridsToSave.put(javaTR, javaGrid)
self._we.put(gridsToSave, gridSaveTR)
# delete the persisted items from the cache and our copy of the
# inventory
gridsToSave.clear()
for tr in i:
del self._inv[tr]
self._invCache = None
inv = inv[BATCH_WRITE_COUNT:]
time.sleep(BATCH_DELAY)
"""Writes the entire contents of the WECache to HDF5/DB"""
# flush entire inventory
self.__flushGrids(self.keys())
def overlaps(self, tr1, tr2):
if (tr1[0] >= tr2[0] and tr1[0] < tr2[1]) or \
@ -215,7 +422,7 @@ class WECache(object):
for theKey in keys:
keyList.append(theKey.toString())
return (grid.__numpy__[0], keyList)
elif gridType =="DISCRETE":
elif gridType == "DISCRETE":
keys = grid.getKey()
keyList = []
for theKey in keys:
@ -229,15 +436,18 @@ class WECache(object):
return tuple(retVal)
def __buildJavaGridSlice(self, tr, grid, history):
javaTR = iscUtil.toJavaTimeRange(tr)
javaHist = self.__buildJavaGridHistory(history)
gridType = self._we.getGridType()
if gridType == "SCALAR":
return self._we.buildScalarSlice(tr, grid.astype(numpy.float32), history)
return self._we.buildScalarSlice(javaTR, grid.astype(numpy.float32), javaHist)
elif gridType == "VECTOR":
return self._we.buildVectorSlice(tr, grid[0].astype(numpy.float32), grid[1].astype(numpy.float32), history)
return self._we.buildVectorSlice(javaTR, grid[0].astype(numpy.float32), grid[1].astype(numpy.float32), javaHist)
elif gridType == "WEATHER":
return self._we.buildWeatherSlice(tr, grid[0].astype(numpy.byte), str(grid[1]), history)
return self._we.buildWeatherSlice(javaTR, grid[0].astype(numpy.byte), str(grid[1]), javaHist)
elif gridType == "DISCRETE":
return self._we.buildDiscreteSlice(tr, grid[0].astype(numpy.byte), str(grid[1]), history)
return self._we.buildDiscreteSlice(javaTR, grid[0].astype(numpy.byte), str(grid[1]), javaHist)
def __buildJavaGridHistory(self, histories):
retVal = ArrayList()
@ -253,16 +463,10 @@ class WECache(object):
class IscMosaic:
## Logging methods ##
def __initLogger(self):
self.__logger=iscUtil.getLogger("iscMosaic",self.__logFile)
def __init__(self, args):
self.__mysite = args['siteID']
self.__userID = args['userID']
self.__db = None # ifpServer database object
self.__dbGrid = None
self.__parmsToProcess = args['parmsToProcess']
self.__blankOtherPeriods = args['blankOtherPeriods']
self.__altMask = args['altMask']
@ -291,26 +495,10 @@ class IscMosaic:
endTime = self.__decodeTimeString(args['endTime'])
self.__processTimePeriod = (startTime, endTime)
self.__initLogger()
def logEvent(self,*msg):
self.__logger.info(iscUtil.tupleToString(*msg))
def logProblem(self,*msg):
self.__logger.error(iscUtil.tupleToString(*msg))
def logException(self,*msg):
self.__logger.exception(iscUtil.tupleToString(*msg))
def logVerbose(self,*msg):
self.__logger.debug(iscUtil.tupleToString(*msg))
def logDebug(self,*msg):
self.logVerbose(iscUtil.tupleToString(*msg))
initLogger(self.__logFile)
def execute(self):
self.logEvent("iscMosaic Starting")
logger.info("iscMosaic Starting")
# get the WxDefinition and DiscreteDefinition
config = IFPServerConfigManager.getServerConfig(self.__mysite)
@ -320,8 +508,8 @@ class IscMosaic:
self.__db = IFPDB(self.__databaseID)
# parms in database
parmsInDb = self.__db.getKeys()
if len(self.__parmsToProcess) == 0:
parmsInDb = self.__db.getKeys()
for i in range(0, parmsInDb.size()):
self.__parmsToProcess.append(parmsInDb.get(i).toString())
@ -336,7 +524,7 @@ class IscMosaic:
if self.__deleteInput:
os.remove(file)
self.logEvent("iscMosaic Finished")
logger.info("iscMosaic Finished")
def __processInputFile(self, filename):
@ -344,9 +532,9 @@ class IscMosaic:
cpu0 = a[0] + a[1]
start = a[4]
self.logEvent("Processing file=", filename)
logger.info("Processing file=%s", filename)
fsize = os.stat(filename)[stat.ST_SIZE]
self.logEvent("Input file size: ", fsize)
logger.info("Input file size: %d", fsize)
gzipFile = None
unzippedFile = None
@ -370,34 +558,24 @@ class IscMosaic:
cpu = a[0] + a[1]
stop1 = a[4]
if hasattr(NetCDF, "netcdf_file"):
# use this for pupynere
# TODO: Remove False flag passed to constructor to resolve memory
# allocation error found in #7788. If AWIPS2 ever moves to 64-bit
# we'll probably have enough address space to allow the file to be
# memory-mapped.
file = NetCDF.netcdf_file(filename, "r", False)
else:
# use this for ScientificIO.NetCDF
file = NetCDF.NetCDFFile(filename, "r")
# check version
fileV = getattr(file, 'fileFormatVersion')
if fileV != "20010816" and fileV != "20030117":
self.logProblem("Incompatible file format found")
logger.error("Incompatible file format found")
raise Exception, "Incompatible file format"
# get creation time
self.__creTime = getattr(file, 'creationTime')
creTimeString = time.asctime(time.gmtime(self.__creTime))
self.logEvent("CreationTime:" , creTimeString)
logger.info("CreationTime: %s" , creTimeString)
# get file start/end processing times
self.__modProcTime = self.__getFileProcessingTimes(file)
if self.__modProcTime is None:
return None
self.logEvent("Process TR: ", self.__printTR(self.__modProcTime))
logger.info("Process TR: %s", printTR(self.__modProcTime))
# prepare for the notification message
totalTimeRange = None
@ -430,18 +608,17 @@ class IscMosaic:
idx = parmName.rfind("_")
parmName = parmName[0:idx] + incomingOfficeType + \
parmName[idx:]
self.logEvent("Renamed to: " + parmName + \
" data from " + siteID)
logger.info("Renamed to: %s data from %s", parmName, siteID)
# ignore this parm?
if parmName in self.__parmsToIgnore:
self.logEvent("Ignoring ", parmName)
logger.info("Ignoring %s", parmName)
continue
# match in ifp database?
if not parmName in self.__parmsToProcess and \
len(self.__parmsToProcess) != 0:
self.logEvent("Skipping", parmName)
logger.info("Skipping %s", parmName)
continue
(pName, pTR, nGrids, nFail) = self.__processParm(parmName, vars, remapHistory, filename)
@ -463,13 +640,13 @@ class IscMosaic:
#announce storage
if len(self.__announce) and totalGrids > 0:
msg = self.__announce + self.__siteID + ' ' + `pParms` + ' ' + self.__printShortTR(totalTimeRange) + ' #Grids=' + `totalGrids`
msg = self.__announce + self.__siteID + ' ' + `pParms` + ' ' + printShortTR(totalTimeRange) + ' #Grids=' + `totalGrids`
if totalFails:
msg = msg + '[' + `totalFails` + ' FAILED]'
notification = UserMessageNotification(msg, Priority.CRITICAL, "ISC", self.__mysite)
else:
notification = UserMessageNotification(msg, Priority.EVENTA, "ISC", self.__mysite)
self.logEvent(msg)
logger.info(msg)
SendNotifications.send(notification)
# announce "modified/adjusted" data
@ -481,10 +658,14 @@ class IscMosaic:
a = os.times()
cpugz = a[0] + a[1]
stop = a[4]
self.logEvent("Elapsed/CPU time: ",
"%-.2f" % (stop1 - start), "/", "%-.2f" % (cpu - cpu0), "decompress,",
"%-.2f" % (stop - stop1), "/", "%-.2f" % (cpugz - cpu), "processing,",
"%-.2f" % (stop - start), "/", "%-.2f" % (cpugz - cpu0), "total")
logger.info("Elapsed/CPU time: "
"%-.2f / %-.2f decompress, "
"%-.2f / %-.2f processing, "
"%-.2f / %-.2f total",
stop1 - start, cpu - cpu0,
stop - stop1, cpugz - cpu,
stop - start, cpugz - cpu0)
def __processParm(self, parmName, vars, history, filename):
@ -495,18 +676,23 @@ class IscMosaic:
inTimesProc = []
numFailed = 0
self.__siteID = str(getattr(vars[0], "siteID"))
inTimes = self.__getIncomingValidTimes(vars[0])
logger.info("Processing %s #Grids=%d Site=%s", parmName, len(inTimes), self.__siteID)
if self.__eraseFirst or len(inTimes) > 0:
while retryAttempt != retries:
self.logDebug("iscMosaic: Attempting to acquire cluster lock for:",parmName)
logger.debug("iscMosaic: Attempting to acquire cluster lock for: %s", parmName)
startTime = time.time()
clusterLock = ClusterLockUtils.lock("ISC Write Lock",parmName , 120000, True)
elapsedTime = (time.time() - startTime)*1000
self.logDebug("iscMosaic: Request for",parmName+" took",elapsedTime,"ms")
clusterLock = ClusterLockUtils.lock("ISC Write Lock", parmName , 120000, True)
elapsedTime = (time.time() - startTime) * 1000
logger.debug("iscMosaic: Request for %s took %d ms", parmName, elapsedTime)
if str(clusterLock.getLockState()) == "SUCCESSFUL":
self.logDebug("iscMosaic: Successfully acquired cluster lock for:",parmName)
logger.debug("iscMosaic: Successfully acquired cluster lock for: %s", parmName)
try:
# open up the ifpServer weather element
self.__dbwe = self.__db.getItem(parmName,ISC_USER)
self._wec = WECache(self.__dbwe, self.__modProcTime)
self.__dbwe = self.__db.getItem(parmName, ISC_USER)
self._wec = WECache(self.__dbwe, tr=self.__modProcTime)
self.__rateParm = self.__dbwe.getGpi().isRateParm()
self.__parmName = parmName
@ -517,7 +703,6 @@ class IscMosaic:
gridType = getattr(vars[0], "gridType")
minV = self.__dbwe.getGpi().getMinValue()
# compute the site mask
self.__siteID = str(getattr(vars[0], "siteID"))
if self.__areaMask is None:
self.__areaMask = self.__computeAreaMask().getGrid().__numpy__[0]
@ -527,24 +712,19 @@ class IscMosaic:
minV, self.__areaMask, gridType, self.__dbwe.getDiscreteKeys())
# erase all existing grids first?
#self.__dbinv = self.__dbwe.keys()
self.__dbinv = self._wec.keys()
if self.__eraseFirst:
self.__eraseAllGrids(self.__modProcTime)
else:
try:
self.__splitGridsOnProcBoundary(self.__modProcTime)
except:
self.logProblem('Failure to splitGridsOnProcBoundary ',
' Parm=', parmName, ' Time=',
self.__printTR(self.__modProcTime), traceback.format_exc())
logger.exception('Failure to splitGridsOnProcBoundary Parm=%s Time=%s',
parmName, printTR(self.__modProcTime))
if self.__eraseFirst:
self.__eraseAllGrids(self.__modProcTime)
# process each incoming grid
inTimes = self.__getIncomingValidTimes(vars[0])
inTimesProc = []
numFailed = 0
self.logEvent("Processing ", parmName, " #Grids=",
len(inTimes), " Site=", self.__siteID)
# process incoming grids
for i in xrange(len(inTimes)):
@ -555,8 +735,7 @@ class IscMosaic:
if tr is not None:
inTimesProc.append(tr)
try:
self.logDebug("Processing Grid: ", parmName, \
" TR=", self.__printTR(tr))
logger.debug("Processing Grid: %s TR=%s", parmName, printTR(tr))
# get the grid and remap it
grid = self.__getGridFromNetCDF(gridType, vars, i)
@ -577,23 +756,18 @@ class IscMosaic:
grid = self.__adjustForTime(inTimes[i], tr, grid,
inFillV)
# get inventory original inventory
# self.__dbinv = self.__dbwe.getKeys()
# print self.__dbinv
# merge the grids
self.__processIncomingGrid(parmName, grid, history[i],
mGrid, tr, inFillV)
except:
self.logProblem('Failure to process grid in file [',
filename, '] Parm=', parmName, ' Time=',
self.__printTR(tr), traceback.format_exc())
logger.exception('Failure to process grid in file [%s] Parm=%s Time=%s',
filename, parmName, printTR(tr))
numFailed = numFailed + 1
else:
self.logDebug("Skipping Grid: ", parmName, " TR=", self.__printTR(tr), "outside start/end range")
logger.debug("Skipping Grid: %s TR=%s outside start/end range",
parmName, printTR(tr))
# blank out any gaps
@ -605,9 +779,12 @@ class IscMosaic:
tr = iscTime.intersection(blankTimes[i], self.__modProcTime)
if tr is not None:
try:
logger.debug("Processing Blank: %s TR=%s",
parmName, self.__printTR(tr))
self.__processBlankTime(mGrid, tr)
except:
self.logProblem('Failure to process grid blanking Parm=', parmName, ' Time=', self.__printTR(tr), traceback.format_exc())
logger.exception('Failure to process grid blanking Parm=%s Time=%s',
parmName, printTR(tr))
@ -619,23 +796,25 @@ class IscMosaic:
retryAttempt = retries
except:
retryAttempt = retryAttempt + 1
self.logProblem("Error saving ISC data. Retrying (", retryAttempt, "/", retries, ")",traceback.format_exc())
logger.exception("Error saving ISC data. Retrying ( %d / %d )", retryAttempt, retries)
time.sleep(1)
finally:
self.logDebug("iscMosaic: Attempting to release cluster lock for:",parmName)
logger.debug("iscMosaic: Attempting to release cluster lock for: %s", parmName)
ClusterLockUtils.unlock(clusterLock, False)
self.logDebug("iscMosaic: Successfully released cluster lock for:",parmName)
logger.debug("iscMosaic: Successfully released cluster lock for: %s", parmName)
elif str(clusterLock.getLockState()) == "OLD":
retryAttempt = retryAttempt + 1
# Clear old lock to retry
self.logDebug("Old lock retrieved for ISC write. Attempting to renew lock")
logger.debug("Old lock retrieved for ISC write. Attempting to renew lock")
ClusterLockUtils.unlock(clusterLock, False)
elif str(clusterLock.getLockState()) == "FAILED":
retryAttempt = retryAttempt + 1
if retryAttempt == retries:
self.logProblem("Cluster lock could not be established for ",self._we.getParmid(),"at time range",TimeRange(tr[0],tr[1]),"Data was not saved.")
logger.error("Cluster lock could not be established for %s at time range %s Data was not saved.",
self._we.getParmid(), TimeRange(tr[0], tr[1]))
else:
self.logProblem("Cluster lock request failed for ISC write.", retries, "Retrying (", retryAttempt, "/", retries, ")")
logger.error("Cluster lock request failed for ISC write. Retrying ( %d / %d )",
retryAttempt, retries)
time.sleep(1)
return (pName, totalTimeRange, len(inTimesProc), numFailed)
@ -647,9 +826,9 @@ class IscMosaic:
# get the associated db grids, merge, and store
for m in merge:
self.logDebug("Merge: ", self.__printTR(m[0]),
self.__printTR(m[1]), m[2])
gotGrid = self.__getDbGrid(m[0])
logger.debug("Merge: %s %s %s", printTR(m[0]),
printTR(m[1]), m[2])
gotGrid = self._wec[m[0]]
if gotGrid is not None:
destGrid = gotGrid[0]
@ -704,48 +883,15 @@ class IscMosaic:
def __storeGrid(self, tr, grid):
if grid is not None and grid[1] is not None and grid[0] is not None:
#try:
#self.__dbwe.histSave(tr, grid[0], grid[1])
logger.debug("Store: %s", printTR(tr))
self._wec[tr] = grid
#except Exception, e:
# self.logProblem("StoreFailed: ", tr, `grid[1]`)
# raise e
if tr not in self.__dbinv:
self.__dbinv = self._wec.keys()
#self.__dbinv = self.__dbwe.keys()
self.logDebug("Store:", self.__printTR(tr))
else:
logger.debug("Erase: %s", printTR(tr))
self._wec[tr] = None
self.__dbinv = self._wec.keys()
#self.__dbwe[tr] = None
#self.__dbinv = self.__dbwe.keys()
self.logDebug("Erase:", self.__printTR(tr))
#---------------------------------------------------------------------
# get db grid
# Gets the needed database grid
# tr = desired grid, identified by time range
# Returns tuple of (grid, history) (or None if unknown)
#---------------------------------------------------------------------
def __getDbGrid(self, tr):
if tr is None:
return None
if self.__dbGrid is None or tr != self.__dbGrid[2]:
self.__dbGrid = None
#grid = self.__dbwe.getGridAndHist(tr)
grid = self._wec[tr]
if grid is not None:
destGrid, history = grid
self.__dbGrid = (destGrid, history, tr)
else:
self.logProblem("Unable to access grid for ",
self.__printTR(tr), "for ", self.__parmName)
return None
return (self.__dbGrid[0], self.__dbGrid[1])
#---------------------------------------------------------------------
# calculate file start/end processing times
@ -761,22 +907,9 @@ class IscMosaic:
self.__processTimePeriod)
if modProcTime is None:
self.logProblem("Skipping file due to non overlapping periods")
logger.error("Skipping file due to non overlapping periods")
return modProcTime
#-------------------------------------------------------------------------
# Get printable time range (yymmdd_hhmm,yymmdd_hhmm)
# Works with list or tuple
#-------------------------------------------------------------------------
def __printTR(self, tr):
if tr is not None:
format = "%Y%m%d_%H%M"
s = '(' + time.strftime(format, time.gmtime(tr[0])) + ',' + \
time.strftime(format, time.gmtime(tr[1])) + ')'
return s
else:
return "None"
def __decodeTimeString(self, timeStr):
"Create an Integer time from a string: YYYYMMDD_HHMM"
@ -788,8 +921,7 @@ class IscMosaic:
except ImportError:
importError = True
except:
self.logProblem(timeStr, \
"is not a valid time string. Use YYYYMMDD_HHMM", traceback.format_exc())
logger.exception("%s is not a valid time string. Use YYYYMMDD_HHMM", timeStr)
raise Exception, "Bad date format YYYYMMDD_HHMM"
return iscTime.timeFromComponents(intTime)
@ -927,9 +1059,9 @@ class IscMosaic:
def __determineFillValue(self, var):
gridType = getattr(var, "gridType")
if gridType == 'SCALAR' or gridType == 'VECTOR':
return - 30000.0
return -30000.0
else:
return - 127
return -127
#---------------------------------------------------------------------
# compute the area mask
@ -953,18 +1085,17 @@ class IscMosaic:
areaMask = iscUtil.getEditArea(self.__altMask, self.__mysite)
areaMask.setGloc(self.__dbwe.getGpi().getGridLoc())
except:
self.logProblem("Unable to access edit mask [",
self.__altMask, "]", traceback.format_exc())
raise Exception, "Unknown edit area mask [" + self.__altMask + ']'
logger.exception("Unable to access edit mask [%s]",
self.__altMask)
raise Exception("Unknown edit area mask [%s]" % self.__altMask)
else:
maskName = "ISC_" + self.__siteID
try:
areaMask = iscUtil.getEditArea(maskName, self.__mysite)
areaMask.setGloc(self.__dbwe.getGpi().getGridLoc())
except:
self.logProblem("Unable to access edit mask [",
maskName, "]", traceback.format_exc())
raise Exception, "Unknown edit area mask [" + maskName + ']'
logger.exception("Unable to access edit mask [%s]", maskName)
raise Exception("Unknown edit area mask [%s]" % maskName)
return areaMask
@ -981,7 +1112,7 @@ class IscMosaic:
for m in mergeInfo:
if m[0] != m[1]: #split grid needed
if m[0] != oldTR:
oldGrid = self.__getDbGrid(m[0])
oldGrid = self._wec[m[0]]
oldTR = m[0]
if oldGrid is not None:
if self.__rateParm:
@ -990,7 +1121,6 @@ class IscMosaic:
self.__storeGrid(m[1], (adjGrid, oldGrid[1]))
else:
self.__storeGrid(m[1], oldGrid)
self.__dbGrid = None
#-------------------------------------------------------------------------
# Get Incoming netCDF file grid valid times
@ -1206,15 +1336,6 @@ class IscMosaic:
outKeys.append(s)
return outKeys
def __printShortTR(self, tr):
if tr is not None:
format = "%d/%H"
s = '(' + time.strftime(format, time.gmtime(tr[0])) + '->' + \
time.strftime(format, time.gmtime(tr[1])) + ')'
return s
else:
return "None"
#---------------------------------------------------------------------
# adjust for time
# Adjusts a rate dependent grid based on time durations. No processing
@ -1279,7 +1400,7 @@ class IscMosaic:
if m[0] != None and m[2] == 1:
if self.__siteInDbGrid(m[0]):
try:
(destGrid, oldHist) = self.__getDbGrid(m[0])
(destGrid, oldHist) = self._wec[m[0]]
except:
destGrid = None
oldHist = None
@ -1323,13 +1444,13 @@ class IscMosaic:
# set up error message
smsg = "Adjusting DiscreteKey for Compatibility: " + parmName + \
' tr=' + self.__printTR(tr)
' tr=' + printTR(tr)
# get the list of discrete keys for this parameter that are allowed
dd = self.__disDef.keys(parmName)
if dd.size() == 0:
self.logProblem("Unable to validate keys for ",
parmName, " - no def in DiscreteDefinition")
logger.error("Unable to validate keys for %s - no def in DiscreteDefinition",
parmName)
return grid
#now go through the incoming grid's keys and validate each one
@ -1379,13 +1500,11 @@ class IscMosaic:
keyentry = "^".join(eachKey) #join back to string
if len(changedReasons):
self.logProblem(smsg,
"from [" + oldEntry + "] to [" + keyentry + "]",
"(" + ",".join(changedReasons) + ")")
msg = self.__siteID + " " + parmName + " " + \
self.__printShortTR(tr) + \
" [" + oldEntry + "] -> [" + keyentry + "] (" + \
",".join(changedReasons) + ")"
logger.error("%s from [%s] to [%s] (%s)",
smsg, oldEntry, keyentry, ",".join(changedReasons))
msg = "%s %s %s [%s] -> [%s] (%s)" % \
self.__siteID, parmName, printShortTR(tr), oldEntry, keyentry, ",".join(changedReasons)
self.__adjDataMsg.append(msg)
key[idx] = keyentry #store back into list
@ -1409,7 +1528,7 @@ class IscMosaic:
# set up error message
smsg = "Adjusting WeatherKey for Compatibility: " + parmName + \
' tr=' + self.__printTR(tr)
' tr=' + printTR(tr)
#now go through the incoming grid's keys and validate each one
for idx in xrange(len(key)): #each index of the weather key
@ -1481,13 +1600,12 @@ class IscMosaic:
# report any changes
if len(changedReasons):
self.logProblem(smsg,
" from [" + oldEntry + "] to [" + keyentry + "]",
"(" + ",".join(changedReasons) + ")")
msg = self.__siteID + " " + parmName + " " + \
self.__printShortTR(tr) + \
" [" + oldEntry + "] -> [" + keyentry + "] (" + \
",".join(changedReasons) + ")"
logger.error("%s from [%s] to [%s] (%s)",
smsg, oldEntry, keyentry, ",".join(changedReasons))
msg = "%s %s %s [%s] -> [%s] (%s)" % \
self.__siteID, parmName, printShortTR(tr), oldEntry, keyentry, ",".join(changedReasons)
self.__adjDataMsg.append(msg)
return (g, key)
@ -1497,26 +1615,7 @@ class IscMosaic:
# processTimePeriod procesTimePeriod = time range to remove grids
#---------------------------------------------------------------------
def __eraseAllGrids(self, processTimePeriod):
dbinv = self.__dbinv
mergeInfo = iscTime.mergeTR(processTimePeriod, dbinv)
oldGrid = None
oldTR = None
for m in mergeInfo:
if m[2] == 0: #split grid, don't erase
if m[0] != oldTR:
oldGrid = self.__getDbGrid(m[0])
oldTR = m[0]
if oldGrid is not None:
if self.__rateParm:
adjGrid = self.__adjustForTime(m[0], m[1], oldGrid[0],
0.0) #old db grids don'thave missing value flags
self.__storeGrid(m[1], (adjGrid, oldGrid[1]))
else:
self.__storeGrid(m[1], oldGrid)
elif m[2] == 1: #overlaps mean erase
self.__storeGrid(m[1], None)
self.__dbGrid = None
self.__storeGrid(processTimePeriod, None)
def convertList(unknownList):

View file

@ -18,13 +18,13 @@
# further licensing information.
##
import string, IrtAccess, JUtil
import string, IrtAccess, JUtil, logging
import xml, pickle, tempfile, os
from xml.etree import ElementTree
from xml.etree.ElementTree import Element, SubElement
import LogStream
from datetime import datetime
from time import gmtime,strftime
from time import gmtime, strftime
from java.io import File
from com.raytheon.uf.common.time import TimeRange
from com.raytheon.uf.common.dataplugin.gfe.db.objects import GridLocation
@ -49,6 +49,7 @@ from com.raytheon.uf.common.localization import LocalizationContext_Localization
# 02/19/13 1637 randerso Removed unused import
# 03/11/13 1759 dgilling Move siteConfig import into
# methods where it's needed.
# 11/07/13 2517 randerso Allow getLogger to override logLevel
#
#
#
@ -59,12 +60,12 @@ def getEditArea(name, siteID):
commonStaticConfig = pathMgr.getContext(LocalizationType.COMMON_STATIC, LocalizationLevel.SITE)
commonStaticConfig.setContextName(siteID)
file = pathMgr.getFile(commonStaticConfig,"gfe/editAreas"+ File.separator + name + ".xml")
file = pathMgr.getFile(commonStaticConfig, "gfe/editAreas" + File.separator + name + ".xml")
if not os.path.exists(file.getPath()):
commonStaticConfig = pathMgr.getContext(LocalizationType.COMMON_STATIC, LocalizationLevel.CONFIGURED)
commonStaticConfig.setContextName(siteID)
file = pathMgr.getFile(commonStaticConfig,"gfe/editAreas"+ File.separator + name + ".xml")
file = pathMgr.getFile(commonStaticConfig, "gfe/editAreas" + File.separator + name + ".xml")
refData = None
@ -72,7 +73,7 @@ def getEditArea(name, siteID):
if os.path.exists(file.getPath()):
refData = SerializationUtil.jaxbUnmarshalFromXmlFile(file.getPath());
else:
LogStream.logProblem("EDIT AREA NOT FOUND: ",name," for site ",siteID)
LogStream.logProblem("EDIT AREA NOT FOUND: ", name, " for site ", siteID)
except:
LogStream.logProblem("Unable to unmarshal " + name + " in iscExtract")
@ -84,7 +85,7 @@ def saveEditAreaGrid(maskName, iscMask, siteID):
pathMgr = PathManagerFactory.getPathManager();
commonStaticConfig = pathMgr.getContext(LocalizationType.COMMON_STATIC, LocalizationLevel.CONFIGURED)
commonStaticConfig.setContextName(siteID)
sitePath = pathMgr.getFile(commonStaticConfig,"gfe/editAreas").getPath()
sitePath = pathMgr.getFile(commonStaticConfig, "gfe/editAreas").getPath()
editAreaPath = str(sitePath) + "/" + maskName + ".xml"
SerializationUtil.jaxbMarshalToXmlFile(iscMask, editAreaPath)
@ -92,7 +93,7 @@ def deleteEditArea(name, siteID):
pathMgr = PathManagerFactory.getPathManager()
commonStaticConfig = pathMgr.getContext(LocalizationType.COMMON_STATIC, LocalizationLevel.CONFIGURED)
commonStaticConfig.setContextName(siteID)
file = pathMgr.getFile(commonStaticConfig,"gfe/editAreas"+ File.separator + name + ".xml")
file = pathMgr.getFile(commonStaticConfig, "gfe/editAreas" + File.separator + name + ".xml")
file.delete()
def transformTime(tr):
@ -135,7 +136,7 @@ def sortServers(a, b):
sameSiteA = (a['mhsid'] == a['site'])
sameSiteB = (b['mhsid'] == b['site'])
if sameSiteA and not sameSiteB:
return - 1
return -1
elif not sameSiteA and sameSiteB:
return 1
#both are same sites, check for host next
@ -143,7 +144,7 @@ def sortServers(a, b):
regHostA = (a['host'][0:3] in ['dx4', 'px3'])
regHostB = (b['host'][0:3] in ['dx4', 'px3'])
if regHostA and not regHostB:
return - 1
return -1
elif not regHostA and regHostB:
return 1
# same host, but not preferred host
@ -151,7 +152,7 @@ def sortServers(a, b):
regPortA = (a['port'] == "98000000")
regPortB = (b['port'] == "98000000")
if regPortA and not regPortB:
return - 1
return -1
elif not regPortA and regPortB:
return 1
return 1 #must be non-standard, put at end of list
@ -228,20 +229,20 @@ def createDomainDict(xml):
return pickledFile
def unPickle(str):
import pickle,tempfile,os,JUtil
import pickle, tempfile, os, JUtil
tempfile.tempdir = "/tmp/"
fname = tempfile.mktemp(".bin")
FILE = open(fname,"w")
FILE = open(fname, "w")
FILE.write(str)
FILE.close()
FILE = open(fname,"r")
FILE = open(fname, "r")
retVal = pickle.load(FILE)
FILE.close()
return retVal
def getRequestXML(xml,selectedServers, selectedWEList):
def getRequestXML(xml, selectedServers, selectedWEList):
irt = IrtAccess.IrtAccess("")
selectedServers = JUtil.javaStringListToPylist(selectedServers)
selectedWElist = JUtil.javaStringListToPylist(selectedWEList)
@ -278,8 +279,7 @@ def getRequestXML(xml,selectedServers, selectedWEList):
return xmlreq;
def getLogger(scriptName, logName=None):
import logging
def getLogger(scriptName, logName=None, logLevel=logging.INFO):
# be relocating this import here we allow
# com.raytheon.edex.plugin.gfe.isc.IscScript to dynamically
# modify its include path with the proper siteConfig just before
@ -287,24 +287,24 @@ def getLogger(scriptName, logName=None):
import siteConfig
if logName is None:
logPath=siteConfig.GFESUITE_LOGDIR+"/"+strftime("%Y%m%d", gmtime())
logName=scriptName+".log"
logPath = siteConfig.GFESUITE_LOGDIR + "/" + strftime("%Y%m%d", gmtime())
logName = scriptName + ".log"
else:
logPath=os.path.dirname(logName)
if len(logPath)==0:
logPath=siteConfig.GFESUITE_LOGDIR+"/"+strftime("%Y%m%d", gmtime())
logName=os.path.basename(logName)
logPath = os.path.dirname(logName)
if len(logPath) == 0:
logPath = siteConfig.GFESUITE_LOGDIR + "/" + strftime("%Y%m%d", gmtime())
logName = os.path.basename(logName)
logFile=logPath+"/"+logName
logFile = logPath + "/" + logName
if not os.path.exists(logPath):
os.makedirs(logPath)
theLog = logging.getLogger(scriptName)
theLog.setLevel(logging.INFO)
theLog.setLevel(logLevel)
ch = logging.FileHandler(logFile)
ch.setLevel(logging.INFO)
ch.setLevel(logLevel)
formatter = logging.Formatter("%(levelname)s %(asctime)s [%(process)d:%(thread)d] %(filename)s: %(message)s")
ch.setFormatter(formatter)
for h in theLog.handlers:
@ -313,8 +313,7 @@ def getLogger(scriptName, logName=None):
return theLog
def tupleToString(*msg):
concatMsg=""
concatMsg = ""
for m in msg:
concatMsg=concatMsg+" "+str(m)
concatMsg = concatMsg + " " + str(m)
return concatMsg

View file

@ -36,6 +36,7 @@ import LogStream, fcntl
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 07/06/09 1995 bphillip Initial Creation.
# 11/05/13 2517 randerso Improve memory utilization
#
#
#
@ -54,7 +55,7 @@ class MergeGrid:
# gridType = 'SCALAR', 'VECTOR', 'WEATHER', 'DISCRETE'
#---------------------------------------------------------------------
def __init__(self, creationTime, siteID, inFillValue, outFillValue,
areaMask, gridType, discreteKeys = None):
areaMask, gridType, discreteKeys=None):
self.__creationTime = creationTime
self.__siteID = siteID
self.__inFillV = inFillValue
@ -94,7 +95,7 @@ class MergeGrid:
for k in range(len(wxB[1])):
index = self.__findKey(wxB[1][k], key)
newGrid = numpy.where(gridB == k, index, newGrid)
newGrid[gridB == k] = index
return (key, wxA[0], newGrid)
@ -113,7 +114,7 @@ class MergeGrid:
# removal any old entry
if historyB is not None:
for h in historyB:
index = string.find(h, ":"+ self.__siteID + "_GRID")
index = string.find(h, ":" + self.__siteID + "_GRID")
if index == -1:
out.append(h)
@ -140,17 +141,17 @@ class MergeGrid:
# merge the grids
if gridA is not None:
inMask = numpy.not_equal(gridA, self.__inFillV)
mask = numpy.logical_and(inMask, self.__areaMask)
mask = numpy.not_equal(gridA, self.__inFillV)
numpy.logical_and(mask, self.__areaMask, mask)
if gridB is None:
gridB = numpy.zeros(gridA.shape) + self.__outFillV
return numpy.where(mask, gridA, self.__outFillV)
else:
return numpy.where(mask, gridA, gridB)
# blank out the data
else:
blankGrid = numpy.zeros(gridB.shape) + self.__outFillV
return numpy.where(self.__areaMask, blankGrid, gridB)
return numpy.where(self.__areaMask, self.__outFillV, gridB)
#---------------------------------------------------------------------
# merge vector grid
@ -165,24 +166,21 @@ class MergeGrid:
# merge the grids
if gridA is not None:
inMask = numpy.not_equal(gridA[0], self.__inFillV)
mask = numpy.logical_and(inMask, self.__areaMask)
mask = numpy.not_equal(gridA[0], self.__inFillV)
numpy.logical_and(mask, self.__areaMask, mask)
if gridB is None:
gridSize = gridA[0].shape
gridB = (numpy.zeros(gridSize) + self.__outFillV,
numpy.zeros(gridSize) + 0.0)
magGrid = numpy.where(mask, gridA[0], self.__outFillV)
dirGrid = numpy.where(mask, gridA[1], 0.0)
else:
magGrid = numpy.where(mask, gridA[0], gridB[0])
dirGrid = numpy.where(mask, gridA[1], gridB[1])
return (magGrid, dirGrid)
# blank out the data
else:
blankGrid = numpy.zeros(gridB[0].shape) + self.__outFillV
blankDirGrid = numpy.zeros_like(gridB[1])
magGrid = numpy.where(self.__areaMask, blankGrid, gridB[0])
dirGrid = numpy.where(self.__areaMask, blankDirGrid, gridB[1])
magGrid = numpy.where(self.__areaMask, self.__outFillV, gridB[0])
dirGrid = numpy.where(self.__areaMask, 0.0, gridB[1])
return (magGrid, dirGrid)
@ -200,8 +198,8 @@ class MergeGrid:
noWx = "<NoCov>:<NoWx>:<NoInten>:<NoVis>:"
# merge the grids
if gridA is not None:
inMask = numpy.not_equal(gridA[0], self.__inFillV)
mask = numpy.logical_and(inMask, self.__areaMask)
mask = numpy.not_equal(gridA[0], self.__inFillV)
numpy.logical_and(mask, self.__areaMask, mask)
if gridB is None: #make an empty grid
noWxKeys = []
@ -234,8 +232,8 @@ class MergeGrid:
# merge the grids
if gridA is not None:
inMask = numpy.not_equal(gridA[0], self.__inFillV)
mask = numpy.logical_and(inMask, self.__areaMask)
mask = numpy.not_equal(gridA[0], self.__inFillV)
numpy.logical_and(mask, self.__areaMask)
if gridB is None: #make an empty grid
noKeys = []

View file

@ -1,48 +1,33 @@
<beans xmlns="http://www.springframework.org/schema/beans"
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="obsDecoder" class="com.raytheon.edex.plugin.obs.ObsDecoder"/>
<bean id="obsDecoder" class="com.raytheon.edex.plugin.obs.ObsDecoder" />
<bean id="metarPointData" class="com.raytheon.edex.plugin.obs.metar.MetarPointDataTransform"/>
<bean id="metarPointData" class="com.raytheon.edex.plugin.obs.metar.MetarPointDataTransform" />
<bean id="obsSeparator" class="com.raytheon.edex.plugin.obs.metar.MetarSeparator" />
<bean id="obsDistRegistry" factory-bean="distributionSrv"
factory-method="register">
<bean id="obsDistRegistry" factory-bean="distributionSrv" factory-method="register">
<constructor-arg value="obs" />
<constructor-arg value="jms-dist:queue:Ingest.obs"/>
<constructor-arg value="jms-dist:queue:Ingest.obs" />
</bean>
<bean id="obsCamelRegistered" factory-bean="contextManager"
factory-method="register"
<bean id="obsCamelRegistered" factory-bean="contextManager" factory-method="register"
depends-on="persistCamelRegistered,
shefCamelRegistered,
metarToHMDBCamelRegistered">
<constructor-arg ref="obs-camel"/>
<constructor-arg ref="obs-camel" />
</bean>
<camelContext id="obs-camel"
xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler"
autoStartup="false">
<!--
<endpoint id="metarFileEndpoint" uri="file:${edex.home}/data/sbn/metar?noop=true&amp;idempotent=false" />
<route id="metarFileConsumerRoute">
<from ref="metarFileEndpoint" />
<bean ref="fileToString" />
<setHeader headerName="pluginName">
<constant>obs</constant>
</setHeader>
<to uri="jms-durable:queue:Ingest.obs" />
</route>
-->
<camelContext id="obs-camel" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler" autoStartup="false">
<!-- Begin METAR routes -->
<route id="metarIngestRoute">
<from uri="jms-durable:queue:Ingest.obs"/>
<from uri="jms-durable:queue:Ingest.obs" />
<setHeader headerName="pluginName">
<constant>obs</constant>
</setHeader>
@ -50,16 +35,17 @@
<pipeline>
<bean ref="stringToFile" />
<bean ref="obsDecoder" method="decode" />
<bean ref="dupElim" />
<bean ref="metarPointData" method="toPointData" />
<multicast>
<to uri="direct-vm:persistIndexAlert" />
<to uri="direct-vm:metarToShef" />
<to uri="direct-vm:metarToHMDB"/>
<to uri="direct-vm:metarToHMDB" />
</multicast>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:metar?level=ERROR"/>
<to uri="log:metar?level=ERROR" />
</doCatch>
</doTry>
</route>

View file

@ -1,4 +1,5 @@
<beans xmlns="http://www.springframework.org/schema/beans"
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
@ -10,40 +11,23 @@
<constructor-arg ref="sfcobsPluginName" />
</bean>
<bean id="sfcobsDistRegistry" factory-bean="distributionSrv"
factory-method="register">
<bean id="sfcobsDistRegistry" factory-bean="distributionSrv" factory-method="register">
<constructor-arg value="sfcobs" />
<constructor-arg value="jms-dist:queue:Ingest.sfcobs"/>
<constructor-arg value="jms-dist:queue:Ingest.sfcobs" />
</bean>
<bean id="sfcobsCamelRegistered" factory-bean="contextManager"
factory-method="register"
<bean id="sfcobsCamelRegistered" factory-bean="contextManager" factory-method="register"
depends-on="persistCamelRegistered,
shefCamelRegistered">
<constructor-arg ref="sfcobs-camel"/>
<constructor-arg ref="sfcobs-camel" />
</bean>
<camelContext id="sfcobs-camel"
xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler"
autoStartup="false">
<!--
<endpoint id="sfcobsFileEndpoint"
uri="file:${edex.home}/data/sbn/sfcobs?noop=true&amp;idempotent=false" />
<route id="sfcobsFileConsumerRoute">
<from ref="sfcobsFileEndpoint" />
<bean ref="fileToString" />
<setHeader headerName="pluginName">
<constant>sfcobs</constant>
</setHeader>
<to uri="jms-durable:queue:Ingest.sfcobs" />
</route>
-->
<camelContext id="sfcobs-camel" xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler" autoStartup="false">
<!-- Begin sfcobs routes -->
<route id="sfcobsIngestRoute">
<from uri="jms-durable:queue:Ingest.sfcobs"/>
<from uri="jms-durable:queue:Ingest.sfcobs" />
<setHeader headerName="pluginName">
<constant>sfcobs</constant>
</setHeader>
@ -51,18 +35,18 @@
<doTry>
<pipeline>
<bean ref="sfcobsDecoder" method="decode" />
<bean ref="dupElim" />
<bean ref="sfcobsPointData" method="toPointData" />
<multicast>
<to uri="direct-vm:persistIndexAlert" />
<to uri="direct-vm:synopticToShef"/>
<to uri="direct-vm:synopticToShef" />
</multicast>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:sfcobs?level=ERROR"/>
<to uri="log:sfcobs?level=ERROR" />
</doCatch>
</doTry>
</route>
</camelContext>
</beans>

View file

@ -63,7 +63,7 @@ import com.raytheon.uf.common.util.ConvertUtil;
*/
public class DataURIUtil {
private static final String PLUGIN_NAME_KEY = "pluginName";
public static final String PLUGIN_NAME_KEY = "pluginName";
private static final String FIELD_SEPARATOR = ".";

View file

@ -6,6 +6,8 @@ archive.cron=0+40+*+*+*+?
archive.purge.enable=true
# purge archives
archive.purge.cron=0+5+0/3+*+*+?
# compress database records
archive.compression.enable=true
# to disable a specific archive, use property archive.disable=pluginName,pluginName...
#archive.disable=grid,text,acars

View file

@ -19,6 +19,8 @@
**/
package com.raytheon.uf.edex.archive;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
@ -80,7 +82,8 @@ import com.raytheon.uf.edex.database.plugin.PluginFactory;
* Jan 18, 2013 1469 bkowal Removed the hdf5 data directory.
* Oct 23, 2013 2478 rferrel Make date format thread safe.
* Add debug information.
* Nov 05, 2013 2499 rjpeter Repackaged, removed config files, always compresses.
* Nov 05, 2013 2499 rjpeter Repackaged, removed config files, always compresses hdf5.
* Nov 11, 2013 2478 rjpeter Updated data store copy to always copy hdf5.
* </pre>
*
* @author rjpeter
@ -114,12 +117,17 @@ public class DatabaseArchiver implements IPluginArchiver {
/** Cluster time out on lock. */
private static final int CLUSTER_LOCK_TIMEOUT = 60000;
/** Chunk size for I/O Buffering and Compression */
private static final int CHUNK_SIZE = 8192;
/** Mapping for plug-in formatters. */
private final Map<String, IPluginArchiveFileNameFormatter> pluginArchiveFormatters;
/** When true dump the pdos. */
private final boolean debugArchiver;
private final boolean compressDatabaseFiles;
/**
* The constructor.
*/
@ -128,6 +136,8 @@ public class DatabaseArchiver implements IPluginArchiver {
pluginArchiveFormatters.put("default",
new DefaultPluginArchiveFileNameFormatter());
debugArchiver = Boolean.getBoolean("archive.debug.enable");
compressDatabaseFiles = Boolean
.getBoolean("archive.compression.enable");
}
@Override
@ -259,12 +269,9 @@ public class DatabaseArchiver implements IPluginArchiver {
.join(archivePath, pluginName, dataStoreFile));
try {
// data must be older than 30 minutes, and no older than
// hours to keep hours need to lookup plugin and see if
// compression matches, or embed in configuration the
// compression level on archive, but would still need to
// lookup plugin
ds.copy(outputDir, compRequired, "lastArchived", 0, 0);
// copy the changed hdf5 file, does repack if
// compRequired, otherwise pure file copy
ds.copy(outputDir, compRequired, null, 0, 0);
} catch (StorageException e) {
statusHandler.handle(Priority.PROBLEM,
e.getLocalizedMessage());
@ -325,7 +332,11 @@ public class DatabaseArchiver implements IPluginArchiver {
path.setLength(path.length() - 3);
}
int pathDebugLength = path.length();
if (compressDatabaseFiles) {
path.append(".bin.gz");
} else {
path.append(".bin");
}
File file = new File(path.toString());
List<PersistableDataObject> pdosToSerialize = entry.getValue();
@ -338,7 +349,13 @@ public class DatabaseArchiver implements IPluginArchiver {
try {
// created gzip'd stream
is = new GZIPInputStream(new FileInputStream(file), 8192);
if (compressDatabaseFiles) {
is = new GZIPInputStream(new FileInputStream(file),
CHUNK_SIZE);
} else {
is = new BufferedInputStream(new FileInputStream(file),
CHUNK_SIZE);
}
// transform back for list append
@SuppressWarnings("unchecked")
@ -400,7 +417,12 @@ public class DatabaseArchiver implements IPluginArchiver {
}
// created gzip'd stream
os = new GZIPOutputStream(new FileOutputStream(file), 8192);
if (compressDatabaseFiles) {
os = new GZIPOutputStream(new FileOutputStream(file), CHUNK_SIZE);
} else {
os = new BufferedOutputStream(new FileOutputStream(file),
CHUNK_SIZE);
}
// Thrift serialize pdo list
SerializationUtil.transformToThriftUsingStream(pdosToSerialize,

View file

@ -27,6 +27,7 @@
* Aug 05, 2013 2224 rferrel Changes to add dataSet tags.
* Oct 01, 2013 2147 rferrel Date time stamp no longer requires an hour field.
* Nov 05, 2013 2497 rferrel Change root directory.
* Nov 13, 2013 2549 rferrel Changes to GFE and modelsounding.
*
* @author rferrel
* @version 1.0
@ -151,7 +152,7 @@
<dateGroupIndices>3,4,5,6</dateGroupIndices>
</dataSet>
<dataSet>
<dirPattern>gfe/(.*)/(Fcst|Official)</dirPattern>
<dirPattern>gfe/(.*)/(.*)</dirPattern>
<filePattern>.*_(\d{4})(\d{2})(\d{2})_.*</filePattern>
<displayLabel>{1} - {2}</displayLabel>
<dateGroupIndices>3,4,5</dateGroupIndices>
@ -177,11 +178,11 @@
<filePattern>.*-(\d{4})-(\d{2})-(\d{2})-(\d{2})-.*</filePattern>
</dataSet>
<dataSet>
<dirPattern>(modelsounding)/(.*)</dirPattern>
<dirPattern>(modelsounding)/(.*)/.*</dirPattern>
<dirPattern>(bufrmos)(.*)</dirPattern>
<displayLabel>{1} - {2}</displayLabel>
<dateGroupIndices>3,4,5,6</dateGroupIndices>
<filePattern>.*(\d{4})-(\d{2})-(\d{2})[-_](\d{2}).*</filePattern>
<filePattern>.*(\d{4})-(\d{2})-(\d{2})-(\d{2}).*</filePattern>
</dataSet>
</category>
<category>

View file

@ -219,6 +219,20 @@
<dateGroupIndices>1,2,3,4</dateGroupIndices>
</dataSet>
</category>
<category>
<name>Radar (Local)</name>
<extRetentionHours>168</extRetentionHours>
<dataSet>
<dirPattern>radar/([k|t|e|f]\w{3})/.*</dirPattern> <!-- one level like GSM or HI --> <!-- e and f are for FAA ASR and ARSR radars -->
<dirPattern>radar/(k...|t...|e...|f...)/.*/.*</dirPattern> <!-- two levels like ML -->
<dirPattern>radar/(k...|t...|e...|f...)/.*/.*/.*</dirPattern> <!-- three levels like ML -->
<dirPattern>radar/(k...|t...|e...|f...)/.*/.*/.*/.*</dirPattern> <!-- four levels like Z -->
<dirPattern>radar/(k...|t...|e...|f...)/.*/.*/.*/.*/.*</dirPattern> <!-- five levels like Z (superres) -->
<filePattern>(\w{4}).(\d*).(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(.*)</filePattern>
<displayLabel>{1}</displayLabel>
<dateGroupIndices>4,5,6,7</dateGroupIndices>
</dataSet>
</category>
<category>
<name>Satellite</name>
<extRetentionHours>168</extRetentionHours>

View file

@ -105,7 +105,7 @@ then
log_msg "Finished running iscMosaic..."
# Generate a GFE message saying new Grids have arrived in Restore database.
cd ${GFESUITE_BIN}
sendGfeMessage -h ${SVCBU_HOST} -p ${CDSPORT} -u -m "Restore database has been populated with new grids."
./sendGfeMessage -h ${SVCBU_HOST} -p ${CDSPORT} -u -m "Restore database has been populated with new grids."
else
log_msg "Unable to locate the gridded data of the site,${import_file} You will need to request your backup site to send grids again."
log_msg 100

View file

@ -1,25 +0,0 @@
Copyright (c) 2009, Swiss AviationSoftware Ltd. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
- Neither the name of the Swiss AviationSoftware Ltd. nor the names of its
contributors may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

View file

@ -1,178 +0,0 @@
#
# AWIPS II Eclipse Spec File
#
# --define arguments:
# %{_uframe_eclipse}
# %{_build_root}
# %{_baseline_workspace}
Name: awips2-eclipse
Summary: AWIPS II Eclipse Distribution
Version: 3.6.1
Release: 1
Group: AWIPSII
BuildRoot: %{_build_root}
URL: N/A
License: N/A
Distribution: N/A
Vendor: Raytheon
Packager: Bryan Kowal
AutoReq: no
provides: awips2-eclipse
%description
AWIPS II Eclipse Distribution - Contains the AWIPS II Eclipse Distribution.
# Turn off the brp-python-bytecompile script
%global __os_install_post %(echo '%{__os_install_post}' | sed -e 's!/usr/lib[^[:space:]]*/brp-python-bytecompile[[:space:]].*$!!g')
%global __os_install_post %(echo '%{__os_install_post}' | sed -e 's!/usr/lib[^[:space:]]*/brp-java-repack-jars[[:space:]].*$!!g')
%prep
# Verify That The User Has Specified A BuildRoot.
if [ "%{_build_root}" = "/tmp" ]
then
echo "An Actual BuildRoot Must Be Specified. Use The --buildroot Parameter."
echo "Unable To Continue ... Terminating"
exit 1
fi
if [ -d %{_build_root} ]; then
rm -rf %{_build_root}
fi
mkdir -p %{_build_root}/awips2/eclipse
%build
%install
mkdir -p %{_build_root}/awips2/eclipse
# The location of the awips2 eclipse source directory will be
# specified as a command line argument. Fail if the specified
# directory cannot be found.
if [ ! -d %{_uframe_eclipse} ]; then
echo "ERROR: Unable To Find The AWIPS II Eclipse Distribution."
echo "Unable To Continue ... Terminating"
exit 1
fi
# Copy the uframe eclipse distribution.
cp -r %{_uframe_eclipse}/* %{_build_root}/awips2/eclipse
# Copy eclipse.sh to our build-directory.
cp %{_baseline_workspace}/rpms/awips2.ade/Installer.eclipse/scripts/* \
%{_build_root}/awips2/eclipse
# delete the basemaps and etc links
rm -f %{_build_root}/awips2/eclipse/basemaps
rm -f %{_build_root}/awips2/eclipse/etc
%pre
JAVA_INSTALL="<Not Installed>"
PYTHON_INSTALL="<Not Installed>"
ANT_INSTALL="<Not Installed>"
INSTALL_PATH="/awips2/java"
if [ -d ${INSTALL_PATH} ]; then
JAVA_INSTALL=${INSTALL_PATH}
fi
INSTALL_PATH="/awips2/python"
if [ -d ${INSTALL_PATH} ]; then
PYTHON_INSTALL=${INSTALL_PATH}
fi
INSTALL_PATH="/awips2/ant"
if [ -d ${INSTALL_PATH} ]; then
ANT_INSTALL=${INSTALL_PATH}
fi
echo -e "\e[1;34m--------------------------------------------------------------------------------\e[m"
echo -e "\e[1;34m\| Installing the AWIPS II Eclipse Distribution...\e[m"
echo -e "\e[1;34m--------------------------------------------------------------------------------\e[m"
echo -e "\e[1;34m Java Detected At: ${JAVA_INSTALL}\e[m"
echo -e "\e[1;34m Python Detected At: ${PYTHON_INSTALL}\e[m"
echo -e "\e[1;34m Ant Detected At: ${ANT_INSTALL}\e[m"
%post
echo -e "\e[1;34m--------------------------------------------------------------------------------\e[m"
echo -e "\e[1;34m\| Creating ADE Eclipse Desktop Shortcut...\e[m"
echo -e "\e[1;34m--------------------------------------------------------------------------------\e[m"
ADE_ECLIPSE_SHORTCUT="ade-eclipse"
SHORTCUT_OWNER="${USER}"
CREATE_SHORTCUT="true"
if [ ! "${SUDO_USER}" = "" ]; then
SHORTCUT_OWNER="${SUDO_USER}"
fi
echo -e "\e[1;34m Creating Shortcut For User: ${SHORTCUT_OWNER}\e[m"
USER_HOME_DIR="~${SHORTCUT_OWNER}"
if [ ! -d ${USER_HOME_DIR} ]; then
USER_HOME_DIR="/home/${SHORTCUT_OWNER}"
echo " (Assuming User Home Directory Is Under '/home')"
fi
if [ ! -d ${USER_HOME_DIR}/Desktop ]; then
echo -e "\e[1;31m ERROR: Unable To Find The User's Desktop!!!"
CREATE_SHORTCUT="false"
fi
if [ "${CREATE_SHORTCUT}" = "true" ]; then
SHORTCUT_TMP="${USER_HOME_DIR}/Desktop/${ADE_ECLIPSE_SHORTCUT}.tmp"
SHORTCUT="${USER_HOME_DIR}/Desktop/${ADE_ECLIPSE_SHORTCUT}.desktop"
if [ -f ${SHORTCUT} ]; then
echo -n " Attempting To Remove The Existing Shortcut ... "
sudo -u ${SHORTCUT_OWNER} rm -f ${SHORTCUT}
if [ ! -f ${SHORTCUT} ]; then
echo -n "SUCCESS"
else
echo -n "FAILURE"
fi
echo ""
fi
sudo -u ${SHORTCUT_OWNER} touch ${SHORTCUT_TMP}
sudo -u ${SHORTCUT_OWNER} chmod 666 ${SHORTCUT_TMP}
echo "[Desktop Entry]" >> ${SHORTCUT_TMP}
echo "Version=1.0" >> ${SHORTCUT_TMP}
echo "Encoding=UTF-8" >> ${SHORTCUT_TMP}
echo "Name=ADE Eclipse" >> ${SHORTCUT_TMP}
echo "GenericName=Eclipse" >> ${SHORTCUT_TMP}
echo "Comment=IDE" >> ${SHORTCUT_TMP}
echo "Exec=/bin/bash -i -c \"xterm -title 'AWIPS II ADE Eclipse' -e '/awips2/eclipse/eclipseShortcutWrap.sh'\"" >> ${SHORTCUT_TMP}
echo "Icon=/awips2/eclipse/icon.xpm" >> ${SHORTCUT_TMP}
echo "Terminal=false" >> ${SHORTCUT_TMP}
echo "Type=Application" >> ${SHORTCUT_TMP}
echo "Categories=Development;IDE;" >> ${SHORTCUT_TMP}
sudo -u ${SHORTCUT_OWNER} mv ${SHORTCUT_TMP} ${SHORTCUT}
sudo -u ${SHORTCUT_OWNER} chmod 644 ${SHORTCUT}
fi
echo -e "\e[1;32m--------------------------------------------------------------------------------\e[m"
echo -e "\e[1;32m\| AWIPS II Eclipse Distribution Installation - COMPLETE\e[m"
echo -e "\e[1;32m--------------------------------------------------------------------------------\e[m"
%preun
%postun
%clean
rm -rf ${RPM_BUILD_ROOT}
%files
%defattr(644,awips,fxalpha,755)
%dir /awips2/eclipse
/awips2/eclipse/*
%defattr(755,awips,fxalpha,755)
/awips2/eclipse/about.html
/awips2/eclipse/artifacts.xml
/awips2/eclipse/eclipse
/awips2/eclipse/eclipse.ini
/awips2/eclipse/eclipse.sh
/awips2/eclipse/eclipseShortcutWrap.sh
/awips2/eclipse/epl-v10.html
/awips2/eclipse/icon.xpm
/awips2/eclipse/libcairo-swt.so
/awips2/eclipse/notice.html

View file

@ -48,4 +48,4 @@ if [ ! -f ${JAVA} ]; then
exit 1
fi
$JAVA -jar ${QPID_HOME}/bin/yajsw/wrapper.jar -c ${QPID_HOME}/conf/${CONF_FILE}
$JAVA -Xmx32m -XX:MaxPermSize=12m -XX:ReservedCodeCacheSize=4m -jar ${QPID_HOME}/bin/yajsw/wrapper.jar -c ${QPID_HOME}/conf/${CONF_FILE}

View file

@ -14,7 +14,7 @@ diff -crB a/qpid-java.spec b/qpid-java.spec
!
! Name: awips2-qpid-java
Version: 0.18
! Release: 3%{?dist}
! Release: 4%{?dist}
Summary: Java implementation of Apache Qpid
License: Apache Software License
Group: Development/Java

View file

@ -400,14 +400,16 @@ if [ "${1}" = "-viz" ]; then
buildRPM "awips2"
buildRPM "awips2-common-base"
#buildRPM "awips2-python-dynamicserialize"
buildRPM "awips2-python"
buildRPM "awips2-adapt-native"
buildRPM "awips2-gfesuite-client"
buildRPM "awips2-gfesuite-server"
#buildRPM "awips2-python"
#buildRPM "awips2-adapt-native"
#unpackHttpdPypies
#if [ $? -ne 0 ]; then
# exit 1
#fi
#buildRPM "awips2-httpd-pypies"
buildRPM "awips2-hydroapps-shared"
#buildRPM "awips2-hydroapps-shared"
#buildRPM "awips2-rcm"
#buildRPM "awips2-tools"
#buildRPM "awips2-cli"