Merge "Issue #2446 Added cluster lock and check for purgeable data to publish." into development

Former-commit-id: 32131b3b27 [formerly 2220ce9aa3] [formerly b5072746fe [formerly d41377864a0a20fa6e09f6a035e33402b0d79c16]]
Former-commit-id: b5072746fe
Former-commit-id: f71d65d257
This commit is contained in:
Richard Peter 2013-10-23 16:57:28 -05:00 committed by Gerrit Code Review
commit feb1655ae3
3 changed files with 308 additions and 254 deletions

View file

@ -83,7 +83,10 @@ import com.raytheon.viz.gfe.core.wxvalue.WxValue;
* 02/23/2012 1876 dgilling Implement missing clearUndoParmList
* function.
* 02/13/2013 #1597 randerso Added logging to support GFE Performance metrics
* Feb 15, 2013 1638 mschenke Moved Util.getUnixTime into TimeUtil
* 02/15/2013 #1638 mschenke Moved Util.getUnixTime into TimeUtil
* 10/15/2013 #2445 randerso Removed expansion of publish time to span of
* overlapping grids since this is now done on
* the server side
*
* </pre>
*
@ -153,6 +156,9 @@ public class ParmOp {
/**
* Called when the user changes the selected VectorMode.
*
* @param mode
* the vector edit mode
*/
public void setVectorMode(ParmState.VectorMode mode) {
Iterator<Parm> parmIterator = new ArrayList<Parm>(
@ -178,6 +184,12 @@ public class ParmOp {
/**
* Copies the specified grid into the copy/paste buffer.
*
* @param parm
* the destination parm
* @param date
* the destination time
* @return true if successful
*/
public boolean copyGrid(Parm parm, Date date) {
@ -207,7 +219,10 @@ public class ParmOp {
* by the GridID.
*
* @param parm
* the destination parm
* @param date
* the destination time
* @return true if successful
*/
public boolean pasteGrid(Parm parm, Date date) {
// is it a compatible grid?
@ -265,7 +280,7 @@ public class ParmOp {
*/
public boolean okToPasteGrid(Parm parm, Date date) {
// verify we have a valid source grid
if (copiedGrid == null || parm == null) {
if ((copiedGrid == null) || (parm == null)) {
return false;
}
GridParmInfo sourceGPI = copiedGrid.getParm().getGridInfo();
@ -363,7 +378,7 @@ public class ParmOp {
for (Parm parm : allParms) {
if (parm.isMutable()) {
if (repeatInterval == 0 || duration == 0) {
if ((repeatInterval == 0) || (duration == 0)) {
TimeConstraints tc = parm.getGridInfo()
.getTimeConstraints();
parm.createFromScratchSelectedTR(mode,
@ -475,32 +490,12 @@ public class ParmOp {
return;
}
// filter out some of the requests that are not desired
final ConcurrentLinkedQueue<CommitGridRequest> requests = new ConcurrentLinkedQueue<CommitGridRequest>();
for (CommitGridRequest curReq : req) {
// adjust for server's grid inventory and expand tr to include the
// entire grid
TimeRange tr = curReq.getTimeRange();
ParmID id = curReq.getParmId();
List<TimeRange> inv;
try {
inv = dataManager.serverParmInventory(id);
for (TimeRange invTR : inv) {
if (invTR != null && tr != null && invTR.overlaps(tr)) {
tr = tr.combineWith(invTR);
} else if (invTR.getStart().after(tr.getEnd())) { // efficiency
break;
}
}
} catch (GFEServerException e) {
statusHandler.handle(Priority.PROBLEM,
"Unable to get server parm inventory", e);
continue;
}
// expanding the publish requests to cover the span of the overlapping
// grids is done on the server side now
requests.add(new CommitGridRequest(id, tr, dataManager
.clientISCSendStatus()));
}
// put requests in a queue to be worked of by the publish threads
final ConcurrentLinkedQueue<CommitGridRequest> requests = new ConcurrentLinkedQueue<CommitGridRequest>(
req);
final ConcurrentLinkedQueue<ServerResponse<?>> okSrs = new ConcurrentLinkedQueue<ServerResponse<?>>();
final AtomicBoolean allOk = new AtomicBoolean(true);
@ -753,7 +748,7 @@ public class ParmOp {
if (source != null) {
parm.copySelectedTRFrom(source);
}
if (created && source != null) {
if (created && (source != null)) {
parmMgr.deleteParm(source);
}
}
@ -829,7 +824,7 @@ public class ParmOp {
clearUndoParmList();
p.copyTRFrom(source, tr); // perform the copy
}
if (created && source != null) {
if (created && (source != null)) {
parmMgr.deleteParm(source);
}
// if (addedToCache)
@ -1068,8 +1063,8 @@ public class ParmOp {
List<SendISCRequest> requests = new ArrayList<SendISCRequest>();
// check for auto - mode, single req with NULL values.
if (req.size() == 1 && req.get(0).getTimeRange() == null
&& req.get(0).getParmId() == null) {
if ((req.size() == 1) && (req.get(0).getTimeRange() == null)
&& (req.get(0).getParmId() == null)) {
requests = req;
}
@ -1149,6 +1144,11 @@ public class ParmOp {
}
}
/**
* Sets the smooth size
*
* @param smoothSize
*/
public void setSmoothSize(int smoothSize) {
Parm[] allParms = this.dataManager.getParmManager().getAllParms();
@ -1157,6 +1157,14 @@ public class ParmOp {
}
}
/**
* Saves all parameters
*
* @param undisplayed
* true to include undisplayed parms
* @param displayed
* true to include displayed parms
*/
public void saveAllParameters(boolean undisplayed, boolean displayed) {
if (displayed) {
Parm[] parms = dataManager.getParmManager().getDisplayedParms();

View file

@ -102,6 +102,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
* so new GFE databases aren't accidentally created.
* 08/05/13 #1571 randerso Added support for storing GridLocation and ParmStorageInfo in database
* 09/30/2013 #2147 rferrel Changes to archive hdf5 files.
* 10/15/2013 #2446 randerso Added ORDER BY clause to getOverlappingTimes
*
* </pre>
*
@ -970,7 +971,8 @@ public class GFEDao extends DefaultPluginDao {
.find("SELECT dataTime.validPeriod"
+ " FROM GFERecord WHERE parmId = ?"
+ " AND dataTime.validPeriod.start < ?"
+ " AND dataTime.validPeriod.end > ?",
+ " AND dataTime.validPeriod.end > ?"
+ " ORDER BY dataTime.validPeriod.start",
new Object[] { parmId, tr.getEnd(),
tr.getStart() });
return rval;

View file

@ -112,6 +112,10 @@ import com.raytheon.uf.edex.database.purge.PurgeLogger;
* Changed to call D2DGridDatabase.getDatabase instead of calling
* the constructor directly to ensure the data exists before creating
* the D2DGridDatabase object
* 10/10/13 #2446 randerso Added cluster lock to prevent two threads from attempting to publish
* the same parm simultaneously.
* Added code to check the purge times when publishing and not publish
* data that is eligible to be purged.
*
* </pre>
*
@ -126,7 +130,7 @@ public class GridParmManager {
private static final IPerformanceStatusHandler perfLog = PerformanceStatus
.getHandler("GFE:");
protected static final String TASK_NAME = "GridParmManager";
private static final String SMART_INIT_TASK_NAME = "GridParmManager";
private static final String SMART_INIT_TASK_DETAILS = "SmartInit:";
@ -134,6 +138,10 @@ public class GridParmManager {
// minutes
private static final int SMART_INIT_TIMEOUT = 1800000;
private static final String COMMIT_GRIDS_TASK_NAME = "commitGrids";
private static int COMMIT_GRIDS_TIMEOUT = 10000;
private String siteID;
private IFPServerConfig config;
@ -427,6 +435,7 @@ public class GridParmManager {
statusHandler.info("Publish/Commit Grids Request: " + parmReq);
List<CommitGridRequest> failures = new ArrayList<CommitGridRequest>();
ITimer lockTimer = TimeUtil.getTimer();
ITimer inventoryTimer = TimeUtil.getTimer();
ITimer retrieveTimer = TimeUtil.getTimer();
ITimer historyRetrieveTimer = TimeUtil.getTimer();
@ -441,8 +450,8 @@ public class GridParmManager {
TimeRange publishTime = req.getTimeRange();
// for the source data
GridParm sourceGP = null;
sourceGP = gridParm(req.getParmId());
ParmID sourceParmId = req.getParmId();
GridParm sourceGP = gridParm(sourceParmId);
if (!sourceGP.isValid()) {
ssr.addMessage("Unknown Source Parm: " + req.getParmId()
+ " in commitGrid()");
@ -454,6 +463,7 @@ public class GridParmManager {
// for the destination data
ParmID destParmId = new ParmID(req.getParmId().getParmName(),
officialDBid, req.getParmId().getParmLevel());
String destParmIdStr = destParmId.toString();
GridParm destGP = null;
destGP = gridParm(destParmId);
if (!destGP.isValid()) {
@ -482,13 +492,34 @@ public class GridParmManager {
continue;
}
// TODO: No need to get inventory and then compare for history
// times, just request the history times directly
// ClusterLock to ensure only one thread is publishing at a time.
lockTimer.start();
ClusterTask ct = ClusterLockUtils.lookupLock(
COMMIT_GRIDS_TASK_NAME, destParmIdStr);
ct = ClusterLockUtils.lock(COMMIT_GRIDS_TASK_NAME, destParmIdStr,
COMMIT_GRIDS_TIMEOUT, true);
lockTimer.stop();
try {
// TODO: No need to get inventory and then compare for
// history times, just request the history times directly
// get later of source and destination purge times
Date sourcePurge = purgeTime(sourceParmId.getDbId());
Date startTime = purgeTime(destParmId.getDbId());
if (sourcePurge.after(startTime)) {
startTime = sourcePurge;
}
// trim publish time to avoid publishing purgeable data
if (startTime.after(publishTime.getStart())) {
publishTime.setStart(startTime);
}
// get the source data inventory
inventoryTimer.start();
ServerResponse<List<TimeRange>> invSr = sourceGP.getGridInventory();
List<TimeRange> inventory = invSr.getPayload();
ServerResponse<List<TimeRange>> invSr = sourceGP
.getGridInventory(publishTime);
List<TimeRange> overlapInventory = invSr.getPayload();
ssr.addMessages(invSr);
if (!ssr.isOkay()) {
ssr.addMessage("GetGridInventory for source for commitGrid() failure: "
@ -497,8 +528,21 @@ public class GridParmManager {
failures.add(req);
}
// get the destination data inventory
invSr = destGP.getGridInventory();
// expand publish time to span overlapping inventory
if (!overlapInventory.isEmpty()) {
Date d = overlapInventory.get(0).getStart();
if (d.before(publishTime.getStart())) {
publishTime.setStart(d);
}
d = overlapInventory.get(overlapInventory.size() - 1)
.getEnd();
if (d.after(publishTime.getEnd())) {
publishTime.setEnd(d);
}
}
invSr = destGP.getGridInventory(publishTime);
inventoryTimer.stop();
List<TimeRange> destInventory = invSr.getPayload();
ssr.addMessages(invSr);
@ -510,16 +554,6 @@ public class GridParmManager {
continue;
}
// determine set of grids that overlap the commit grid request
List<TimeRange> overlapInventory = new ArrayList<TimeRange>();
for (TimeRange invTime : inventory) {
if (invTime.overlaps(publishTime)) {
overlapInventory.add(invTime);
} else if (invTime.getStart().after(publishTime.getEnd())) {
break;
}
}
// get the source grid data
List<IGridSlice> sourceData = null;
List<TimeRange> badGridTR = new ArrayList<TimeRange>();
@ -537,18 +571,19 @@ public class GridParmManager {
Map<TimeRange, List<GridDataHistory>> historyOnly = new HashMap<TimeRange, List<GridDataHistory>>();
for (TimeRange tr : history.getPayload().keySet()) {
// should only ever be one history for source grids
List<GridDataHistory> gdhList = history.getPayload().get(tr);
List<GridDataHistory> gdhList = history.getPayload()
.get(tr);
boolean doPublish = false;
for (GridDataHistory gdh : gdhList) {
// if update time is less than publish time, grid has not
// changed since last published, therefore only update
// history, do not publish
// if update time is less than publish time, grid
// has not changed since last published,
// therefore only update history, do not publish
if ((gdh.getPublishTime() == null)
|| (gdh.getUpdateTime().getTime() > gdh
.getPublishTime().getTime())
// in service backup, times on srcHistory could
// appear as not needing a publish, even though
// dest data does not exist
// in service backup, times on srcHistory
// could appear as not needing a publish,
// even though dest data does not exist
|| (currentDestHistory.get(tr) == null)
|| (currentDestHistory.get(tr).size() == 0)) {
doPublish = true;
@ -584,14 +619,11 @@ public class GridParmManager {
List<IGridSlice> officialData = new ArrayList<IGridSlice>();
List<TimeRange> officialTR = new ArrayList<TimeRange>();
for (int t = 0; t < destInventory.size(); t++) {
if (destInventory.get(t).overlaps(publishTime)
&& !req.getTimeRange().contains(destInventory.get(t))) {
if (!publishTime.contains(destInventory.get(t))) {
officialTR.add(destInventory.get(t));
}
if (destInventory.get(t).getStart().after(publishTime.getEnd())) {
break;
}
}
if (!officialTR.isEmpty()) {
retrieveTimer.start();
getSr = destGP.getGridData(new GetGridRequest(destParmId,
@ -614,22 +646,25 @@ public class GridParmManager {
if (officialTR.get(t).getStart()
.before(publishTime.getStart())) {
IGridSlice tempSlice = officialData.get(t).clone();
tempSlice
.setValidTime(new TimeRange(officialTR.get(
t).getStart(), publishTime
IGridSlice tempSlice = officialData.get(t)
.clone();
tempSlice.setValidTime(new TimeRange(officialTR
.get(t).getStart(), publishTime
.getStart()));
sourceData.add(0, tempSlice);
publishTime.setStart(officialTR.get(t).getStart());
publishTime.setStart(officialTR.get(t)
.getStart());
overlapInventory.add(tempSlice.getValidTime());
}
// after
if (officialTR.get(t).getEnd()
.after(publishTime.getEnd())) {
IGridSlice tempSlice = officialData.get(t).clone();
tempSlice.setValidTime(new TimeRange(publishTime
.getEnd(), officialTR.get(t).getEnd()));
IGridSlice tempSlice = officialData.get(t)
.clone();
tempSlice.setValidTime(new TimeRange(
publishTime.getEnd(), officialTR.get(t)
.getEnd()));
sourceData.add(tempSlice);
publishTime.setEnd(officialTR.get(t).getEnd());
overlapInventory.add(tempSlice.getValidTime());
@ -639,18 +674,11 @@ public class GridParmManager {
+ e.getMessage());
}
}
// adjust publishTime
// publishTime = publishTime.combineWith(new
// TimeRange(officialTR
// .get(0).getStart(), officialTR.get(
// officialTR.size() - 1).getEnd()));
}
// save off the source grid history, to update the source database
// modify the source grid data for the dest ParmID and
// save off the source grid history, to update the source
// database modify the source grid data for the dest ParmID and
// GridDataHistory
Map<TimeRange, List<GridDataHistory>> histories = new HashMap<TimeRange, List<GridDataHistory>>();
Date nowTime = new Date();
@ -660,10 +688,12 @@ public class GridParmManager {
hist.setPublishTime((Date) nowTime.clone());
}
slice.getGridInfo().resetParmID(destParmId);
histories.put(slice.getValidTime(), Arrays.asList(sliceHist));
histories.put(slice.getValidTime(),
Arrays.asList(sliceHist));
}
// update the history for publish time for grids that are unchanged
// update the history for publish time for grids that are
// unchanged
for (TimeRange tr : historyOnly.keySet()) {
List<GridDataHistory> histList = historyOnly.get(tr);
for (GridDataHistory hist : histList) {
@ -672,19 +702,24 @@ public class GridParmManager {
histories.put(tr, histList);
}
// update the publish times in the source database, update the
// notifications
// update the publish times in the source database,
// update the notifications
historyUpdateTimer.start();
sr.addMessages(sourceGP.updatePublishTime(histories.values(),
(Date) nowTime.clone()));
// System.out.println("Updated " + histories.size() + " histories");
// System.out.println("Updated " + histories.size() +
// " histories");
historyUpdateTimer.stop();
changes.add(new GridUpdateNotification(req.getParmId(), req
.getTimeRange(), histories, requestorId, siteID));
List<TimeRange> historyTimes = new ArrayList<TimeRange>(
histories.keySet());
Collections.sort(historyTimes);
changes.add(new GridUpdateNotification(req.getParmId(),
publishTime, histories, requestorId, siteID));
// update the histories of destination database for ones that
// are not going to be saved since there hasn't been a change
// update the histories of destination database for ones
// that are not going to be saved since there hasn't been a
// change
List<TimeRange> historyOnlyList = new ArrayList<TimeRange>();
historyOnlyList.addAll(historyOnly.keySet());
@ -700,10 +735,10 @@ public class GridParmManager {
}
}
// only need to update the publish time on the destination histories
// of grids that are not being saved (due to no changes), because
// the saveGridSlices() call below will update the publish time
// of the ones with changes
// only need to update the publish time on the destination
// histories of grids that are not being saved (due to no
// changes), because the saveGridSlices() call below will update
// the publish time of the ones with changes
historyUpdateTimer.start();
destGP.updatePublishTime(destHistory.values(),
(Date) nowTime.clone());
@ -715,7 +750,9 @@ public class GridParmManager {
ssr.addMessages(officialDBPtr.saveGridSlices(destParmId,
publishTime, sourceData, requestorId, historyOnlyList));
storeTimer.stop();
// System.out.println("Published " + sourceData.size() + " slices");
// System.out.println("Published " + sourceData.size() +
// " slices");
if (!ssr.isOkay()) {
ssr.addMessage("SaveGridData for official for commitGrid() failure: "
+ ssr.message());
@ -725,12 +762,18 @@ public class GridParmManager {
}
// make the notification
GridUpdateNotification not = new GridUpdateNotification(destParmId,
publishTime, histories, requestorId, siteID);
GridUpdateNotification not = new GridUpdateNotification(
destParmId, publishTime, histories, requestorId, siteID);
changes.add(not);
sr.getPayload().add(not);
} finally {
ClusterLockUtils.unlock(ct, false);
}
}
perfLog.logDuration("Publish Grids: Acquiring cluster lock",
lockTimer.getElapsedTime());
perfLog.logDuration("Publish Grids: Retrieving inventories",
inventoryTimer.getElapsedTime());
perfLog.logDuration("Publish Grids: Retrieving histories",
@ -1159,7 +1202,7 @@ public class GridParmManager {
SmartInitQueue queue = SmartInitQueue.getQueue();
if (queue != null) {
// acquire cluster lock since only needs to happen once
ClusterTask ct = ClusterLockUtils.lookupLock(TASK_NAME,
ClusterTask ct = ClusterLockUtils.lookupLock(SMART_INIT_TASK_NAME,
SMART_INIT_TASK_DETAILS + siteID);
// TODO: reconsider this as changes to localConfig may change what
@ -1167,7 +1210,8 @@ public class GridParmManager {
// TODO: re-enable check
// if ((ct.getLastExecution() + SMART_INIT_TIMEOUT) < System
// .currentTimeMillis()) {
ct = ClusterLockUtils.lock(TASK_NAME, SMART_INIT_TASK_DETAILS
ct = ClusterLockUtils
.lock(SMART_INIT_TASK_NAME, SMART_INIT_TASK_DETAILS
+ siteID, SMART_INIT_TIMEOUT, false);
if (LockState.SUCCESSFUL.equals(ct.getLockState())) {
boolean clearTime = false;