Issue #1941: Speed up iscMosaic by properly utilizing WECache.

Change-Id: Ic8d7ea31cf8f1a989c4f06c0a60c9a351ed8ce8e

Former-commit-id: 60bcd26d36 [formerly 60bcd26d36 [formerly f3d7241b3eafb86046cdcc6c28197244b55f21e9]]
Former-commit-id: 159302cf8c
Former-commit-id: 14701c14d7
This commit is contained in:
David Gillingham 2013-04-25 15:01:04 -05:00
parent e039d2cc13
commit cec8443e3f
2 changed files with 264 additions and 101 deletions

View file

@ -24,7 +24,9 @@ import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.TimeZone;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
@ -35,7 +37,6 @@ import com.raytheon.edex.plugin.gfe.server.lock.LockManager;
import com.raytheon.edex.plugin.gfe.util.SendNotifications;
import com.raytheon.uf.common.dataplugin.gfe.GridDataHistory;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.GFERecord;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.GFERecord.GridType;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.GridParmInfo;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.ParmID;
import com.raytheon.uf.common.dataplugin.gfe.discrete.DiscreteKey;
@ -77,6 +78,9 @@ import com.raytheon.uf.common.time.TimeRange;
* Jan 22, 2010 4248 njensen Better error msgs
* Jul 25, 2012 #957 dgilling Implement getEditArea().
* Apr 23, 2013 #1937 dgilling Implement get().
* Apr 23, 2013 #1941 dgilling Implement put(), add methods to build
* Scalar/VectorGridSlices, refactor
* Discrete/WeatherGridSlices builders.
*
* </pre>
*
@ -234,6 +238,76 @@ public class IFPWE {
return rval;
}
/**
* Stores the provided grid slices into this weather element's permanent
* storage.
*
* @param inventory
* A Map of TimeRanges to IGridSlices to be saved. Time is the
* slice's valid time.
* @param timeRangeSpan
* The replacement time range of grids to be saved. Must cover
* each individual TimeRange in inventory.
* @throws GfeException
* If an error occurs while trying to obtain a lock on the
* destination database.
*/
public void put(LinkedHashMap<TimeRange, IGridSlice> inventory,
TimeRange timeRangeSpan) throws GfeException {
statusHandler.debug("Getting lock for ParmID: " + parmId + " TR: "
+ timeRangeSpan);
ServerResponse<List<LockTable>> lockResponse = LockManager
.getInstance().requestLockChange(
new LockRequest(parmId, timeRangeSpan, LockMode.LOCK),
wsId, siteId);
if (lockResponse.isOkay()) {
statusHandler.debug("LOCKING: Lock granted for: " + wsId
+ " for time range: " + timeRangeSpan);
} else {
statusHandler.error("Could not lock TimeRange " + timeRangeSpan
+ " for parm [" + parmId + "]: " + lockResponse.message());
throw new GfeException("Request lock failed. "
+ lockResponse.message());
}
List<GFERecord> records = new ArrayList<GFERecord>(inventory.size());
for (Entry<TimeRange, IGridSlice> entry : inventory.entrySet()) {
GFERecord rec = new GFERecord(parmId, entry.getKey());
rec.setGridHistory(entry.getValue().getHistory());
rec.setMessageData(entry.getValue());
records.add(rec);
}
SaveGridRequest sgr = new SaveGridRequest(parmId, timeRangeSpan,
records);
try {
ServerResponse<?> sr = GridParmManager.saveGridData(
Arrays.asList(sgr), wsId, siteId);
if (sr.isOkay()) {
SendNotifications.send(sr.getNotifications());
} else {
statusHandler.error("Unable to save grids for parm [" + parmId
+ "] over time range " + timeRangeSpan + ": "
+ sr.message());
}
} finally {
ServerResponse<List<LockTable>> unLockResponse = LockManager
.getInstance().requestLockChange(
new LockRequest(parmId, timeRangeSpan,
LockMode.UNLOCK), wsId, siteId);
if (unLockResponse.isOkay()) {
statusHandler.debug("LOCKING: Unlocked for: " + wsId + " TR: "
+ timeRangeSpan);
} else {
statusHandler.error("Could not unlock TimeRange "
+ timeRangeSpan + " for parm [" + parmId + "]: "
+ lockResponse.message());
throw new GfeException("Request unlock failed. "
+ unLockResponse.message());
}
}
}
private void setItem(TimeRange time, IGridSlice gridSlice,
List<GridDataHistory> gdh) throws GfeException {
GFERecord rec = new GFERecord(parmId, time);
@ -373,9 +447,7 @@ public class IFPWE {
public void setItemDiscrete(TimeRange time, byte[] discreteData,
String keys, List<GridDataHistory> gdhList) throws GfeException {
IGridSlice gridSlice = buildDiscreteSlice(time, discreteData, keys,
gpi.getGridType());
gridSlice
.setHistory(gdhList.toArray(new GridDataHistory[gdhList.size()]));
gdhList);
setItem(time, gridSlice, gdhList);
}
@ -393,9 +465,7 @@ public class IFPWE {
public void setItemWeather(TimeRange time, byte[] weatherData, String keys,
List<GridDataHistory> gdhList) throws GfeException {
IGridSlice gridSlice = buildWeatherSlice(time, weatherData, keys,
gpi.getGridType());
gridSlice
.setHistory(gdhList.toArray(new GridDataHistory[gdhList.size()]));
gdhList);
setItem(time, gridSlice, gdhList);
}
@ -433,60 +503,96 @@ public class IFPWE {
return keys.toArray(new String[keys.size()]);
}
/**
* Builds a ScalarGridSlice to store.
*
* @param time
* The valid time of the slice.
* @param data
* A float array that corresponds to the slice's data.
* @param history
* The GridDataHistory for the new slice.
* @return A ScalarGridSlice based on the provided data, valid for the given
* time, with the provided history.
*/
public ScalarGridSlice buildScalarSlice(TimeRange time, float[] data,
List<GridDataHistory> history) {
return new ScalarGridSlice(time, gpi, history, new Grid2DFloat(gpi
.getGridLoc().getNx(), gpi.getGridLoc().getNy(), data));
}
/**
* Builds a VectorGridSlice to store.
*
* @param time
* The valid time of the slice.
* @param magData
* A float array that corresponds to the slice's magnitude data.
* @param dirData
* A float array that corresponds to the slice's directional
* data.
* @param history
* The GridDataHistory for the new slice.
* @return A VectorGridSlice based on the provided data, valid for the given
* time, with the provided history.
*/
public VectorGridSlice buildVectorSlice(TimeRange time, float[] magData,
float[] dirData, List<GridDataHistory> history) {
return new VectorGridSlice(time, gpi, history, new Grid2DFloat(gpi
.getGridLoc().getNx(), gpi.getGridLoc().getNy(), magData),
new Grid2DFloat(gpi.getGridLoc().getNx(), gpi.getGridLoc()
.getNy(), dirData));
}
/**
* Builds a discrete grid slice to store
*
* @param time
* the time of the data
* @param slice
* an Object[] { byte[], String } corresponding to discrete/wx
* types
* @param type
* the type of the data
* The valid time of the data.
* @param bytes
* A byte[] corresponding to discrete
* @param keyString
* Python encoded form of discrete keys.
* @param history
* histories for this grid.
* @return
* @throws GfeException
*/
private IGridSlice buildDiscreteSlice(TimeRange time, byte[] bytes,
String keyString, GridType type) throws GfeException {
public DiscreteGridSlice buildDiscreteSlice(TimeRange time, byte[] bytes,
String keyString, List<GridDataHistory> history) {
List<DiscreteKey> discreteKeyList = new ArrayList<DiscreteKey>();
List<String> keys = GfeUtil.discreteKeyStringToList(keyString);
for (String k : keys) {
discreteKeyList.add(new DiscreteKey(siteId, k, parmId));
}
return new DiscreteGridSlice(
time,
gpi,
new GridDataHistory[] {},
new Grid2DByte(gpi.getGridLoc().getNx(), gpi.getGridLoc()
.getNy(), bytes),
discreteKeyList.toArray(new DiscreteKey[discreteKeyList.size()]));
return new DiscreteGridSlice(time, gpi, history, new Grid2DByte(gpi
.getGridLoc().getNx(), gpi.getGridLoc().getNy(), bytes),
discreteKeyList);
}
/**
* Builds a weather grid slice to store
*
* @param time
* the time of the data
* @param slice
* an Object[] { byte[], String } corresponding to weather/wx
* types
* @param type
* the type of the data
* The valid time of the data.
* @param bytes
* A byte[] corresponding to weather
* @param keyString
* Python encoded form of weather keys.
* @param history
* histories for this grid.
* @return
* @throws GfeException
*/
private IGridSlice buildWeatherSlice(TimeRange time, byte[] bytes,
String keyString, GridType type) throws GfeException {
public WeatherGridSlice buildWeatherSlice(TimeRange time, byte[] bytes,
String keyString, List<GridDataHistory> history) {
List<WeatherKey> weatherKeyList = new ArrayList<WeatherKey>();
List<String> keys = GfeUtil.discreteKeyStringToList(keyString);
for (String k : keys) {
weatherKeyList.add(new WeatherKey(siteId, k));
}
return new WeatherGridSlice(time, gpi, new GridDataHistory[] {},
new Grid2DByte(gpi.getGridLoc().getNx(), gpi.getGridLoc()
.getNy(), bytes),
weatherKeyList.toArray(new WeatherKey[weatherKeyList.size()]));
return new WeatherGridSlice(time, gpi, history, new Grid2DByte(gpi
.getGridLoc().getNx(), gpi.getGridLoc().getNy(), bytes),
weatherKeyList);
}
@Override

View file

@ -32,6 +32,7 @@ import numpy
import JUtil
from java.util import ArrayList
from java.util import LinkedHashMap
from com.raytheon.uf.common.dataplugin.gfe.grid import Grid2DFloat
from com.raytheon.uf.common.dataplugin.gfe.grid import Grid2DByte
from com.raytheon.uf.common.time import TimeRange
@ -75,6 +76,7 @@ from com.raytheon.uf.edex.database.cluster import ClusterTask
# 01/17/13 15588 jdynina Fixed Publish history removal
# 03/12/13 1759 dgilling Remove unnecessary command line
# processing.
# 04/24/13 1941 dgilling Re-port WECache to match A1.
#
#
@ -86,53 +88,49 @@ ISC_USER="isc"
class WECache(object):
def __init__(self, we, tr=None):
self._grids = []
self._hist = []
self._we = we
self._inv = []
theKeys = self._we.getKeys()
for i in range(0, theKeys.size()):
self._inv.append(iscUtil.transformTime(theKeys.get(i)))
self._inv = {}
self._invCache = None
javaInv = self._we.getKeys()
pyInv = []
for i in xrange(javaInv.size()):
pyInv.append(iscUtil.transformTime(javaInv.get(i)))
# Dont get grids outside of the passed in timerange.
if tr:
tokill = []
for i, t in enumerate(self._inv):
for i, t in enumerate(pyInv):
if not self.overlaps(tr, t):
tokill.append(i)
tokill.reverse()
for i in tokill:
del self._inv[i]
del pyInv[i]
javaTRs = ArrayList()
for tr in pyInv:
javaTRs.add(iscUtil.toJavaTimeRange(tr))
gridsAndHist = self._we.get(javaTRs, True)
for idx, tr in enumerate(pyInv):
pair = gridsAndHist.get(idx)
g = self.__encodeGridSlice(pair.getFirst())
h = self.__encodeGridHistory(pair.getSecond())
self._inv[tr] = (g, h)
def keys(self):
return tuple(self._inv)
if not self._invCache:
self._invCache = tuple(sorted(self._inv.keys(), key=lambda t: t[0]))
return self._invCache
def __getitem__(self, key):
grid = self._we.getItem(iscUtil.toJavaTimeRange(key))
history = grid.getGridDataHistory()
hist = []
for i in range(0, history.size()):
hist.append(history.get(i))
gridType = grid.getGridInfo().getGridType().toString()
if gridType == "SCALAR":
return (grid.__numpy__[0], hist)
elif gridType == "VECTOR":
vecGrids = grid.__numpy__
return ((vecGrids[0], vecGrids[1]), hist)
elif gridType == "WEATHER":
keys = grid.getKeys()
keyList = []
for theKey in keys:
keyList.append(theKey.toString())
return ((grid.__numpy__[0], keyList), hist)
elif gridType == "DISCRETE":
keys = grid.getKey()
keyList = []
for theKey in keys:
keyList.append(theKey.toString())
return ((grid.__numpy__[0], keyList), hist)
try:
return self._inv[key]
except KeyError:
grid = self._we.getItem(iscUtil.toJavaTimeRange(key))
pyGrid = self.__encodeGridSlice(grid)
history = grid.getGridDataHistory()
pyHist = self.__encodeGridHistory(history)
return (pyGrid, pyHist)
def __setitem__(self, tr, value):
if value is None:
@ -142,41 +140,50 @@ class WECache(object):
# Remove any overlapping grids
tokill = []
for i, itr in enumerate(self._inv):
for itr in self._inv:
if self.overlaps(tr, itr):
tokill.append(i)
tokill.reverse()
tokill.append(itr)
for i in tokill:
del self._inv[i]
self._invCache = None
# Now add the new grid if it exists
if grid is not None:
timeRange=iscUtil.toJavaTimeRange(tr)
LogStream.logDebug("iscMosaic: Saving Parm:",self._we.getParmid(),"TR:",timeRange)
gridType = self._we.getGridType()
index = bisect.bisect_left(map(lambda x : x[0], self._inv), tr[0])
self._inv.insert(index, tr)
history = ArrayList()
self._inv[tr] = (grid, hist)
self._invCache = None
for h in hist:
dbName = self._we.getParmid().getDbId().toString()
if dbName.find('Fcst') != -1:
#strip out publish time to allow for publishing correctly
#when merging Fcst out of A1
hh = GridDataHistory(h)
hh.setPublishTime(None)
history.add(hh)
else:
history.add(GridDataHistory(h))
if gridType == 'SCALAR':
self._we.setItemScalar(timeRange, grid.astype(numpy.float32), history)
elif gridType == 'VECTOR':
self._we.setItemVector(timeRange, grid[0].astype(numpy.float32), grid[1].astype(numpy.float32), history)
elif gridType == 'WEATHER':
self._we.setItemWeather(timeRange, grid[0].astype(numpy.byte), str(grid[1]), history)
elif gridType == 'DISCRETE':
self._we.setItemDiscrete(timeRange, grid[0].astype(numpy.byte), str(grid[1]), history)
LogStream.logDebug("iscMosaic: Successfully saved Parm:",self._we.getParmid(),"Time:",timeRange)
def flush(self):
"""Actually writes the contents of the WECache to HDF5/DB"""
# get cache inventory in time range order
# we want to write to disk in contiguous time range blocks so we only
# overwrite what we have full sets of grids for.
inv = list(self.keys())
# Don't believe the grid slices need to be in time order when saving
# but leaving them that way just in case.
gridsToSave = LinkedHashMap()
while inv:
# retrieve the next BATCH of grids to persist
i = inv[:BATCH_WRITE_COUNT]
# pre-compute the replacement TR for the save requests generated by
# IFPWE.put().
# since the inventory is in order it's the start time of the
# first TR and the end time of the last TR.
gridSaveTR = iscUtil.toJavaTimeRange((i[0][0], i[-1][1]))
for tr in i:
javaTR = iscUtil.toJavaTimeRange(tr)
pyGrid, pyHist = self._inv[tr]
javaHist = self.__buildJavaGridHistory(pyHist)
javaGrid = self.__buildJavaGridSlice(javaTR, pyGrid, javaHist)
gridsToSave.put(javaTR, javaGrid)
self._we.put(gridsToSave, gridSaveTR)
# delete the persisted items from the cache and our copy of the
# inventory
gridsToSave.clear()
for tr in i:
del self._inv[tr]
self._invCache = None
inv = inv[BATCH_WRITE_COUNT:]
time.sleep(BATCH_DELAY)
def overlaps(self, tr1, tr2):
@ -185,6 +192,55 @@ class WECache(object):
return True
return False
def __encodeGridSlice(self, grid):
gridType = self._we.getGridType()
if gridType == "SCALAR":
return grid.__numpy__[0]
elif gridType == "VECTOR":
vecGrids = grid.__numpy__
return (vecGrids[0], vecGrids[1])
elif gridType == "WEATHER":
keys = grid.getKeys()
keyList = []
for theKey in keys:
keyList.append(theKey.toString())
return (grid.__numpy__[0], keyList)
elif gridType =="DISCRETE":
keys = grid.getKey()
keyList = []
for theKey in keys:
keyList.append(theKey.toString())
return (grid.__numpy__[0], keyList)
def __encodeGridHistory(self, histories):
retVal = []
for i in xrange(histories.size()):
retVal.append(histories.get(i).getCodedString())
return tuple(retVal)
def __buildJavaGridSlice(self, tr, grid, history):
gridType = self._we.getGridType()
if gridType == "SCALAR":
return self._we.buildScalarSlice(tr, grid.astype(numpy.float32), history)
elif gridType == "VECTOR":
return self._we.buildVectorSlice(tr, grid[0].astype(numpy.float32), grid[1].astype(numpy.float32), history)
elif gridType == "WEATHER":
return self._we.buildWeatherSlice(tr, grid[0].astype(numpy.byte), str(grid[1]), history)
elif gridType == "DISCRETE":
return self._we.buildDiscreteSlice(tr, grid[0].astype(numpy.byte), str(grid[1]), history)
def __buildJavaGridHistory(self, histories):
retVal = ArrayList()
blankPubTime = "Fcst" in self._we.getParmid().getDbId().toString()
for histEntry in histories:
javaHist = GridDataHistory(histEntry)
# strip out publish time to allow for publishing correctly
# when merging Fcst out of A1
if blankPubTime:
javaHist.setPublishTime(None)
retVal.add(javaHist)
return retVal
class IscMosaic:
@ -549,6 +605,7 @@ class IscMosaic:
# Returns tuple of (parmName, TR, #grids, #fails)
if len(inTimesProc):
totalTimeRange = (inTimesProc[0][0], inTimesProc[ -1][ -1] - 3600)
self._wec.flush()
retryAttempt = retries
except: