Merge "Issue #2446 Added cluster lock and check for purgeable data to publish." into development
Former-commit-id:ddcfb3785f
[formerly32131b3b27
] [formerly2220ce9aa3
] [formerlyb5072746fe
[formerly2220ce9aa3
[formerly d41377864a0a20fa6e09f6a035e33402b0d79c16]]] Former-commit-id:b5072746fe
Former-commit-id: c433dcb03f3a6c8f1b7988a97befddc483c4253f [formerlyf71d65d257
] Former-commit-id:feb1655ae3
This commit is contained in:
commit
ca392195cb
3 changed files with 308 additions and 254 deletions
|
@ -83,7 +83,10 @@ import com.raytheon.viz.gfe.core.wxvalue.WxValue;
|
|||
* 02/23/2012 1876 dgilling Implement missing clearUndoParmList
|
||||
* function.
|
||||
* 02/13/2013 #1597 randerso Added logging to support GFE Performance metrics
|
||||
* Feb 15, 2013 1638 mschenke Moved Util.getUnixTime into TimeUtil
|
||||
* 02/15/2013 #1638 mschenke Moved Util.getUnixTime into TimeUtil
|
||||
* 10/15/2013 #2445 randerso Removed expansion of publish time to span of
|
||||
* overlapping grids since this is now done on
|
||||
* the server side
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -153,6 +156,9 @@ public class ParmOp {
|
|||
|
||||
/**
|
||||
* Called when the user changes the selected VectorMode.
|
||||
*
|
||||
* @param mode
|
||||
* the vector edit mode
|
||||
*/
|
||||
public void setVectorMode(ParmState.VectorMode mode) {
|
||||
Iterator<Parm> parmIterator = new ArrayList<Parm>(
|
||||
|
@ -178,6 +184,12 @@ public class ParmOp {
|
|||
|
||||
/**
|
||||
* Copies the specified grid into the copy/paste buffer.
|
||||
*
|
||||
* @param parm
|
||||
* the destination parm
|
||||
* @param date
|
||||
* the destination time
|
||||
* @return true if successful
|
||||
*/
|
||||
public boolean copyGrid(Parm parm, Date date) {
|
||||
|
||||
|
@ -207,7 +219,10 @@ public class ParmOp {
|
|||
* by the GridID.
|
||||
*
|
||||
* @param parm
|
||||
* the destination parm
|
||||
* @param date
|
||||
* the destination time
|
||||
* @return true if successful
|
||||
*/
|
||||
public boolean pasteGrid(Parm parm, Date date) {
|
||||
// is it a compatible grid?
|
||||
|
@ -265,7 +280,7 @@ public class ParmOp {
|
|||
*/
|
||||
public boolean okToPasteGrid(Parm parm, Date date) {
|
||||
// verify we have a valid source grid
|
||||
if (copiedGrid == null || parm == null) {
|
||||
if ((copiedGrid == null) || (parm == null)) {
|
||||
return false;
|
||||
}
|
||||
GridParmInfo sourceGPI = copiedGrid.getParm().getGridInfo();
|
||||
|
@ -363,7 +378,7 @@ public class ParmOp {
|
|||
|
||||
for (Parm parm : allParms) {
|
||||
if (parm.isMutable()) {
|
||||
if (repeatInterval == 0 || duration == 0) {
|
||||
if ((repeatInterval == 0) || (duration == 0)) {
|
||||
TimeConstraints tc = parm.getGridInfo()
|
||||
.getTimeConstraints();
|
||||
parm.createFromScratchSelectedTR(mode,
|
||||
|
@ -475,32 +490,12 @@ public class ParmOp {
|
|||
return;
|
||||
}
|
||||
|
||||
// filter out some of the requests that are not desired
|
||||
final ConcurrentLinkedQueue<CommitGridRequest> requests = new ConcurrentLinkedQueue<CommitGridRequest>();
|
||||
for (CommitGridRequest curReq : req) {
|
||||
// adjust for server's grid inventory and expand tr to include the
|
||||
// entire grid
|
||||
TimeRange tr = curReq.getTimeRange();
|
||||
ParmID id = curReq.getParmId();
|
||||
List<TimeRange> inv;
|
||||
try {
|
||||
inv = dataManager.serverParmInventory(id);
|
||||
for (TimeRange invTR : inv) {
|
||||
if (invTR != null && tr != null && invTR.overlaps(tr)) {
|
||||
tr = tr.combineWith(invTR);
|
||||
} else if (invTR.getStart().after(tr.getEnd())) { // efficiency
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (GFEServerException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Unable to get server parm inventory", e);
|
||||
continue;
|
||||
}
|
||||
// expanding the publish requests to cover the span of the overlapping
|
||||
// grids is done on the server side now
|
||||
|
||||
requests.add(new CommitGridRequest(id, tr, dataManager
|
||||
.clientISCSendStatus()));
|
||||
}
|
||||
// put requests in a queue to be worked of by the publish threads
|
||||
final ConcurrentLinkedQueue<CommitGridRequest> requests = new ConcurrentLinkedQueue<CommitGridRequest>(
|
||||
req);
|
||||
|
||||
final ConcurrentLinkedQueue<ServerResponse<?>> okSrs = new ConcurrentLinkedQueue<ServerResponse<?>>();
|
||||
final AtomicBoolean allOk = new AtomicBoolean(true);
|
||||
|
@ -753,7 +748,7 @@ public class ParmOp {
|
|||
if (source != null) {
|
||||
parm.copySelectedTRFrom(source);
|
||||
}
|
||||
if (created && source != null) {
|
||||
if (created && (source != null)) {
|
||||
parmMgr.deleteParm(source);
|
||||
}
|
||||
}
|
||||
|
@ -829,7 +824,7 @@ public class ParmOp {
|
|||
clearUndoParmList();
|
||||
p.copyTRFrom(source, tr); // perform the copy
|
||||
}
|
||||
if (created && source != null) {
|
||||
if (created && (source != null)) {
|
||||
parmMgr.deleteParm(source);
|
||||
}
|
||||
// if (addedToCache)
|
||||
|
@ -1068,8 +1063,8 @@ public class ParmOp {
|
|||
List<SendISCRequest> requests = new ArrayList<SendISCRequest>();
|
||||
|
||||
// check for auto - mode, single req with NULL values.
|
||||
if (req.size() == 1 && req.get(0).getTimeRange() == null
|
||||
&& req.get(0).getParmId() == null) {
|
||||
if ((req.size() == 1) && (req.get(0).getTimeRange() == null)
|
||||
&& (req.get(0).getParmId() == null)) {
|
||||
requests = req;
|
||||
}
|
||||
|
||||
|
@ -1149,6 +1144,11 @@ public class ParmOp {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the smooth size
|
||||
*
|
||||
* @param smoothSize
|
||||
*/
|
||||
public void setSmoothSize(int smoothSize) {
|
||||
Parm[] allParms = this.dataManager.getParmManager().getAllParms();
|
||||
|
||||
|
@ -1157,6 +1157,14 @@ public class ParmOp {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves all parameters
|
||||
*
|
||||
* @param undisplayed
|
||||
* true to include undisplayed parms
|
||||
* @param displayed
|
||||
* true to include displayed parms
|
||||
*/
|
||||
public void saveAllParameters(boolean undisplayed, boolean displayed) {
|
||||
if (displayed) {
|
||||
Parm[] parms = dataManager.getParmManager().getDisplayedParms();
|
||||
|
|
|
@ -102,6 +102,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
|
|||
* so new GFE databases aren't accidentally created.
|
||||
* 08/05/13 #1571 randerso Added support for storing GridLocation and ParmStorageInfo in database
|
||||
* 09/30/2013 #2147 rferrel Changes to archive hdf5 files.
|
||||
* 10/15/2013 #2446 randerso Added ORDER BY clause to getOverlappingTimes
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -970,7 +971,8 @@ public class GFEDao extends DefaultPluginDao {
|
|||
.find("SELECT dataTime.validPeriod"
|
||||
+ " FROM GFERecord WHERE parmId = ?"
|
||||
+ " AND dataTime.validPeriod.start < ?"
|
||||
+ " AND dataTime.validPeriod.end > ?",
|
||||
+ " AND dataTime.validPeriod.end > ?"
|
||||
+ " ORDER BY dataTime.validPeriod.start",
|
||||
new Object[] { parmId, tr.getEnd(),
|
||||
tr.getStart() });
|
||||
return rval;
|
||||
|
|
|
@ -112,6 +112,10 @@ import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
|||
* Changed to call D2DGridDatabase.getDatabase instead of calling
|
||||
* the constructor directly to ensure the data exists before creating
|
||||
* the D2DGridDatabase object
|
||||
* 10/10/13 #2446 randerso Added cluster lock to prevent two threads from attempting to publish
|
||||
* the same parm simultaneously.
|
||||
* Added code to check the purge times when publishing and not publish
|
||||
* data that is eligible to be purged.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -126,7 +130,7 @@ public class GridParmManager {
|
|||
private static final IPerformanceStatusHandler perfLog = PerformanceStatus
|
||||
.getHandler("GFE:");
|
||||
|
||||
protected static final String TASK_NAME = "GridParmManager";
|
||||
private static final String SMART_INIT_TASK_NAME = "GridParmManager";
|
||||
|
||||
private static final String SMART_INIT_TASK_DETAILS = "SmartInit:";
|
||||
|
||||
|
@ -134,6 +138,10 @@ public class GridParmManager {
|
|||
// minutes
|
||||
private static final int SMART_INIT_TIMEOUT = 1800000;
|
||||
|
||||
private static final String COMMIT_GRIDS_TASK_NAME = "commitGrids";
|
||||
|
||||
private static int COMMIT_GRIDS_TIMEOUT = 10000;
|
||||
|
||||
private String siteID;
|
||||
|
||||
private IFPServerConfig config;
|
||||
|
@ -427,6 +435,7 @@ public class GridParmManager {
|
|||
statusHandler.info("Publish/Commit Grids Request: " + parmReq);
|
||||
List<CommitGridRequest> failures = new ArrayList<CommitGridRequest>();
|
||||
|
||||
ITimer lockTimer = TimeUtil.getTimer();
|
||||
ITimer inventoryTimer = TimeUtil.getTimer();
|
||||
ITimer retrieveTimer = TimeUtil.getTimer();
|
||||
ITimer historyRetrieveTimer = TimeUtil.getTimer();
|
||||
|
@ -441,8 +450,8 @@ public class GridParmManager {
|
|||
TimeRange publishTime = req.getTimeRange();
|
||||
|
||||
// for the source data
|
||||
GridParm sourceGP = null;
|
||||
sourceGP = gridParm(req.getParmId());
|
||||
ParmID sourceParmId = req.getParmId();
|
||||
GridParm sourceGP = gridParm(sourceParmId);
|
||||
if (!sourceGP.isValid()) {
|
||||
ssr.addMessage("Unknown Source Parm: " + req.getParmId()
|
||||
+ " in commitGrid()");
|
||||
|
@ -454,6 +463,7 @@ public class GridParmManager {
|
|||
// for the destination data
|
||||
ParmID destParmId = new ParmID(req.getParmId().getParmName(),
|
||||
officialDBid, req.getParmId().getParmLevel());
|
||||
String destParmIdStr = destParmId.toString();
|
||||
GridParm destGP = null;
|
||||
destGP = gridParm(destParmId);
|
||||
if (!destGP.isValid()) {
|
||||
|
@ -482,255 +492,288 @@ public class GridParmManager {
|
|||
continue;
|
||||
}
|
||||
|
||||
// TODO: No need to get inventory and then compare for history
|
||||
// times, just request the history times directly
|
||||
// ClusterLock to ensure only one thread is publishing at a time.
|
||||
lockTimer.start();
|
||||
ClusterTask ct = ClusterLockUtils.lookupLock(
|
||||
COMMIT_GRIDS_TASK_NAME, destParmIdStr);
|
||||
ct = ClusterLockUtils.lock(COMMIT_GRIDS_TASK_NAME, destParmIdStr,
|
||||
COMMIT_GRIDS_TIMEOUT, true);
|
||||
lockTimer.stop();
|
||||
try {
|
||||
|
||||
// get the source data inventory
|
||||
inventoryTimer.start();
|
||||
ServerResponse<List<TimeRange>> invSr = sourceGP.getGridInventory();
|
||||
List<TimeRange> inventory = invSr.getPayload();
|
||||
ssr.addMessages(invSr);
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("GetGridInventory for source for commitGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
}
|
||||
// TODO: No need to get inventory and then compare for
|
||||
// history times, just request the history times directly
|
||||
|
||||
// get the destination data inventory
|
||||
invSr = destGP.getGridInventory();
|
||||
inventoryTimer.stop();
|
||||
List<TimeRange> destInventory = invSr.getPayload();
|
||||
ssr.addMessages(invSr);
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("GetGridInventory for destination for commitGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
continue;
|
||||
}
|
||||
|
||||
// determine set of grids that overlap the commit grid request
|
||||
List<TimeRange> overlapInventory = new ArrayList<TimeRange>();
|
||||
for (TimeRange invTime : inventory) {
|
||||
if (invTime.overlaps(publishTime)) {
|
||||
overlapInventory.add(invTime);
|
||||
} else if (invTime.getStart().after(publishTime.getEnd())) {
|
||||
break;
|
||||
// get later of source and destination purge times
|
||||
Date sourcePurge = purgeTime(sourceParmId.getDbId());
|
||||
Date startTime = purgeTime(destParmId.getDbId());
|
||||
if (sourcePurge.after(startTime)) {
|
||||
startTime = sourcePurge;
|
||||
}
|
||||
}
|
||||
|
||||
// get the source grid data
|
||||
List<IGridSlice> sourceData = null;
|
||||
List<TimeRange> badGridTR = new ArrayList<TimeRange>();
|
||||
// trim publish time to avoid publishing purgeable data
|
||||
if (startTime.after(publishTime.getStart())) {
|
||||
publishTime.setStart(startTime);
|
||||
}
|
||||
|
||||
// System.out.println("overlapInventory initial size "
|
||||
// + overlapInventory.size());
|
||||
inventoryTimer.start();
|
||||
ServerResponse<List<TimeRange>> invSr = sourceGP
|
||||
.getGridInventory(publishTime);
|
||||
List<TimeRange> overlapInventory = invSr.getPayload();
|
||||
ssr.addMessages(invSr);
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("GetGridInventory for source for commitGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
}
|
||||
|
||||
historyRetrieveTimer.start();
|
||||
ServerResponse<Map<TimeRange, List<GridDataHistory>>> history = sourceGP
|
||||
.getGridHistory(overlapInventory);
|
||||
Map<TimeRange, List<GridDataHistory>> currentDestHistory = destGP
|
||||
.getGridHistory(overlapInventory).getPayload();
|
||||
historyRetrieveTimer.stop();
|
||||
// expand publish time to span overlapping inventory
|
||||
if (!overlapInventory.isEmpty()) {
|
||||
Date d = overlapInventory.get(0).getStart();
|
||||
if (d.before(publishTime.getStart())) {
|
||||
publishTime.setStart(d);
|
||||
}
|
||||
|
||||
Map<TimeRange, List<GridDataHistory>> historyOnly = new HashMap<TimeRange, List<GridDataHistory>>();
|
||||
for (TimeRange tr : history.getPayload().keySet()) {
|
||||
// should only ever be one history for source grids
|
||||
List<GridDataHistory> gdhList = history.getPayload().get(tr);
|
||||
boolean doPublish = false;
|
||||
for (GridDataHistory gdh : gdhList) {
|
||||
// if update time is less than publish time, grid has not
|
||||
// changed since last published, therefore only update
|
||||
// history, do not publish
|
||||
if ((gdh.getPublishTime() == null)
|
||||
|| (gdh.getUpdateTime().getTime() > gdh
|
||||
.getPublishTime().getTime())
|
||||
// in service backup, times on srcHistory could
|
||||
// appear as not needing a publish, even though
|
||||
// dest data does not exist
|
||||
|| (currentDestHistory.get(tr) == null)
|
||||
|| (currentDestHistory.get(tr).size() == 0)) {
|
||||
doPublish = true;
|
||||
d = overlapInventory.get(overlapInventory.size() - 1)
|
||||
.getEnd();
|
||||
if (d.after(publishTime.getEnd())) {
|
||||
publishTime.setEnd(d);
|
||||
}
|
||||
}
|
||||
if (!doPublish) {
|
||||
historyOnly.put(tr, gdhList);
|
||||
overlapInventory.remove(tr);
|
||||
}
|
||||
}
|
||||
|
||||
retrieveTimer.start();
|
||||
ServerResponse<List<IGridSlice>> getSr = sourceGP.getGridData(
|
||||
new GetGridRequest(req.getParmId(), overlapInventory),
|
||||
badGridTR);
|
||||
retrieveTimer.stop();
|
||||
// System.out.println("Retrieved " + overlapInventory.size()
|
||||
// + " grids");
|
||||
sourceData = getSr.getPayload();
|
||||
ssr.addMessages(getSr);
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("GetGridData for source for commitGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
continue;
|
||||
}
|
||||
|
||||
// get list of official grids that overlap publish range and
|
||||
// aren't contained in the publish range, these have to be
|
||||
// included in the publish step. Then get the grids, shorten
|
||||
// and insert into sourceData.
|
||||
List<IGridSlice> officialData = new ArrayList<IGridSlice>();
|
||||
List<TimeRange> officialTR = new ArrayList<TimeRange>();
|
||||
for (int t = 0; t < destInventory.size(); t++) {
|
||||
if (destInventory.get(t).overlaps(publishTime)
|
||||
&& !req.getTimeRange().contains(destInventory.get(t))) {
|
||||
officialTR.add(destInventory.get(t));
|
||||
}
|
||||
if (destInventory.get(t).getStart().after(publishTime.getEnd())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!officialTR.isEmpty()) {
|
||||
retrieveTimer.start();
|
||||
getSr = destGP.getGridData(new GetGridRequest(destParmId,
|
||||
officialTR), badGridTR);
|
||||
retrieveTimer.stop();
|
||||
officialData = getSr.getPayload();
|
||||
ssr.addMessages(getSr);
|
||||
invSr = destGP.getGridInventory(publishTime);
|
||||
inventoryTimer.stop();
|
||||
List<TimeRange> destInventory = invSr.getPayload();
|
||||
ssr.addMessages(invSr);
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("GetGridData for official for commidtGrid() failure: "
|
||||
ssr.addMessage("GetGridInventory for destination for commitGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
continue;
|
||||
}
|
||||
|
||||
// insert the grid into the "sourceGrid" list
|
||||
for (int t = 0; t < officialTR.size(); t++) {
|
||||
// before
|
||||
try {
|
||||
if (officialTR.get(t).getStart()
|
||||
.before(publishTime.getStart())) {
|
||||
// get the source grid data
|
||||
List<IGridSlice> sourceData = null;
|
||||
List<TimeRange> badGridTR = new ArrayList<TimeRange>();
|
||||
|
||||
IGridSlice tempSlice = officialData.get(t).clone();
|
||||
tempSlice
|
||||
.setValidTime(new TimeRange(officialTR.get(
|
||||
t).getStart(), publishTime
|
||||
.getStart()));
|
||||
sourceData.add(0, tempSlice);
|
||||
publishTime.setStart(officialTR.get(t).getStart());
|
||||
overlapInventory.add(tempSlice.getValidTime());
|
||||
}
|
||||
// System.out.println("overlapInventory initial size "
|
||||
// + overlapInventory.size());
|
||||
|
||||
// after
|
||||
if (officialTR.get(t).getEnd()
|
||||
.after(publishTime.getEnd())) {
|
||||
IGridSlice tempSlice = officialData.get(t).clone();
|
||||
tempSlice.setValidTime(new TimeRange(publishTime
|
||||
.getEnd(), officialTR.get(t).getEnd()));
|
||||
sourceData.add(tempSlice);
|
||||
publishTime.setEnd(officialTR.get(t).getEnd());
|
||||
overlapInventory.add(tempSlice.getValidTime());
|
||||
historyRetrieveTimer.start();
|
||||
ServerResponse<Map<TimeRange, List<GridDataHistory>>> history = sourceGP
|
||||
.getGridHistory(overlapInventory);
|
||||
Map<TimeRange, List<GridDataHistory>> currentDestHistory = destGP
|
||||
.getGridHistory(overlapInventory).getPayload();
|
||||
historyRetrieveTimer.stop();
|
||||
|
||||
Map<TimeRange, List<GridDataHistory>> historyOnly = new HashMap<TimeRange, List<GridDataHistory>>();
|
||||
for (TimeRange tr : history.getPayload().keySet()) {
|
||||
// should only ever be one history for source grids
|
||||
List<GridDataHistory> gdhList = history.getPayload()
|
||||
.get(tr);
|
||||
boolean doPublish = false;
|
||||
for (GridDataHistory gdh : gdhList) {
|
||||
// if update time is less than publish time, grid
|
||||
// has not changed since last published,
|
||||
// therefore only update history, do not publish
|
||||
if ((gdh.getPublishTime() == null)
|
||||
|| (gdh.getUpdateTime().getTime() > gdh
|
||||
.getPublishTime().getTime())
|
||||
// in service backup, times on srcHistory
|
||||
// could appear as not needing a publish,
|
||||
// even though dest data does not exist
|
||||
|| (currentDestHistory.get(tr) == null)
|
||||
|| (currentDestHistory.get(tr).size() == 0)) {
|
||||
doPublish = true;
|
||||
}
|
||||
} catch (CloneNotSupportedException e) {
|
||||
sr.addMessage("Error cloning GridSlice "
|
||||
+ e.getMessage());
|
||||
}
|
||||
if (!doPublish) {
|
||||
historyOnly.put(tr, gdhList);
|
||||
overlapInventory.remove(tr);
|
||||
}
|
||||
}
|
||||
|
||||
// adjust publishTime
|
||||
// publishTime = publishTime.combineWith(new
|
||||
// TimeRange(officialTR
|
||||
// .get(0).getStart(), officialTR.get(
|
||||
// officialTR.size() - 1).getEnd()));
|
||||
}
|
||||
|
||||
// save off the source grid history, to update the source database
|
||||
// modify the source grid data for the dest ParmID and
|
||||
// GridDataHistory
|
||||
|
||||
Map<TimeRange, List<GridDataHistory>> histories = new HashMap<TimeRange, List<GridDataHistory>>();
|
||||
Date nowTime = new Date();
|
||||
|
||||
for (IGridSlice slice : sourceData) {
|
||||
GridDataHistory[] sliceHist = slice.getHistory();
|
||||
for (GridDataHistory hist : sliceHist) {
|
||||
hist.setPublishTime((Date) nowTime.clone());
|
||||
retrieveTimer.start();
|
||||
ServerResponse<List<IGridSlice>> getSr = sourceGP.getGridData(
|
||||
new GetGridRequest(req.getParmId(), overlapInventory),
|
||||
badGridTR);
|
||||
retrieveTimer.stop();
|
||||
// System.out.println("Retrieved " + overlapInventory.size()
|
||||
// + " grids");
|
||||
sourceData = getSr.getPayload();
|
||||
ssr.addMessages(getSr);
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("GetGridData for source for commitGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
continue;
|
||||
}
|
||||
slice.getGridInfo().resetParmID(destParmId);
|
||||
histories.put(slice.getValidTime(), Arrays.asList(sliceHist));
|
||||
}
|
||||
|
||||
// update the history for publish time for grids that are unchanged
|
||||
for (TimeRange tr : historyOnly.keySet()) {
|
||||
List<GridDataHistory> histList = historyOnly.get(tr);
|
||||
for (GridDataHistory hist : histList) {
|
||||
hist.setPublishTime((Date) nowTime.clone());
|
||||
// get list of official grids that overlap publish range and
|
||||
// aren't contained in the publish range, these have to be
|
||||
// included in the publish step. Then get the grids, shorten
|
||||
// and insert into sourceData.
|
||||
List<IGridSlice> officialData = new ArrayList<IGridSlice>();
|
||||
List<TimeRange> officialTR = new ArrayList<TimeRange>();
|
||||
for (int t = 0; t < destInventory.size(); t++) {
|
||||
if (!publishTime.contains(destInventory.get(t))) {
|
||||
officialTR.add(destInventory.get(t));
|
||||
}
|
||||
}
|
||||
histories.put(tr, histList);
|
||||
}
|
||||
|
||||
// update the publish times in the source database, update the
|
||||
// notifications
|
||||
historyUpdateTimer.start();
|
||||
sr.addMessages(sourceGP.updatePublishTime(histories.values(),
|
||||
(Date) nowTime.clone()));
|
||||
// System.out.println("Updated " + histories.size() + " histories");
|
||||
historyUpdateTimer.stop();
|
||||
if (!officialTR.isEmpty()) {
|
||||
retrieveTimer.start();
|
||||
getSr = destGP.getGridData(new GetGridRequest(destParmId,
|
||||
officialTR), badGridTR);
|
||||
retrieveTimer.stop();
|
||||
officialData = getSr.getPayload();
|
||||
ssr.addMessages(getSr);
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("GetGridData for official for commidtGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
continue;
|
||||
}
|
||||
|
||||
changes.add(new GridUpdateNotification(req.getParmId(), req
|
||||
.getTimeRange(), histories, requestorId, siteID));
|
||||
// insert the grid into the "sourceGrid" list
|
||||
for (int t = 0; t < officialTR.size(); t++) {
|
||||
// before
|
||||
try {
|
||||
if (officialTR.get(t).getStart()
|
||||
.before(publishTime.getStart())) {
|
||||
|
||||
// update the histories of destination database for ones that
|
||||
// are not going to be saved since there hasn't been a change
|
||||
List<TimeRange> historyOnlyList = new ArrayList<TimeRange>();
|
||||
historyOnlyList.addAll(historyOnly.keySet());
|
||||
IGridSlice tempSlice = officialData.get(t)
|
||||
.clone();
|
||||
tempSlice.setValidTime(new TimeRange(officialTR
|
||||
.get(t).getStart(), publishTime
|
||||
.getStart()));
|
||||
sourceData.add(0, tempSlice);
|
||||
publishTime.setStart(officialTR.get(t)
|
||||
.getStart());
|
||||
overlapInventory.add(tempSlice.getValidTime());
|
||||
}
|
||||
|
||||
historyRetrieveTimer.start();
|
||||
Map<TimeRange, List<GridDataHistory>> destHistory = destGP
|
||||
.getGridHistory(historyOnlyList).getPayload();
|
||||
historyRetrieveTimer.stop();
|
||||
for (TimeRange tr : destHistory.keySet()) {
|
||||
List<GridDataHistory> srcHistList = histories.get(tr);
|
||||
List<GridDataHistory> destHistList = destHistory.get(tr);
|
||||
for (int i = 0; i < srcHistList.size(); i++) {
|
||||
destHistList.get(i).replaceValues(srcHistList.get(i));
|
||||
// after
|
||||
if (officialTR.get(t).getEnd()
|
||||
.after(publishTime.getEnd())) {
|
||||
IGridSlice tempSlice = officialData.get(t)
|
||||
.clone();
|
||||
tempSlice.setValidTime(new TimeRange(
|
||||
publishTime.getEnd(), officialTR.get(t)
|
||||
.getEnd()));
|
||||
sourceData.add(tempSlice);
|
||||
publishTime.setEnd(officialTR.get(t).getEnd());
|
||||
overlapInventory.add(tempSlice.getValidTime());
|
||||
}
|
||||
} catch (CloneNotSupportedException e) {
|
||||
sr.addMessage("Error cloning GridSlice "
|
||||
+ e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// save off the source grid history, to update the source
|
||||
// database modify the source grid data for the dest ParmID and
|
||||
// GridDataHistory
|
||||
Map<TimeRange, List<GridDataHistory>> histories = new HashMap<TimeRange, List<GridDataHistory>>();
|
||||
Date nowTime = new Date();
|
||||
|
||||
for (IGridSlice slice : sourceData) {
|
||||
GridDataHistory[] sliceHist = slice.getHistory();
|
||||
for (GridDataHistory hist : sliceHist) {
|
||||
hist.setPublishTime((Date) nowTime.clone());
|
||||
}
|
||||
slice.getGridInfo().resetParmID(destParmId);
|
||||
histories.put(slice.getValidTime(),
|
||||
Arrays.asList(sliceHist));
|
||||
}
|
||||
|
||||
// update the history for publish time for grids that are
|
||||
// unchanged
|
||||
for (TimeRange tr : historyOnly.keySet()) {
|
||||
List<GridDataHistory> histList = historyOnly.get(tr);
|
||||
for (GridDataHistory hist : histList) {
|
||||
hist.setPublishTime((Date) nowTime.clone());
|
||||
}
|
||||
histories.put(tr, histList);
|
||||
}
|
||||
|
||||
// update the publish times in the source database,
|
||||
// update the notifications
|
||||
historyUpdateTimer.start();
|
||||
sr.addMessages(sourceGP.updatePublishTime(histories.values(),
|
||||
(Date) nowTime.clone()));
|
||||
// System.out.println("Updated " + histories.size() +
|
||||
// " histories");
|
||||
historyUpdateTimer.stop();
|
||||
|
||||
List<TimeRange> historyTimes = new ArrayList<TimeRange>(
|
||||
histories.keySet());
|
||||
Collections.sort(historyTimes);
|
||||
changes.add(new GridUpdateNotification(req.getParmId(),
|
||||
publishTime, histories, requestorId, siteID));
|
||||
|
||||
// update the histories of destination database for ones
|
||||
// that are not going to be saved since there hasn't been a
|
||||
// change
|
||||
List<TimeRange> historyOnlyList = new ArrayList<TimeRange>();
|
||||
historyOnlyList.addAll(historyOnly.keySet());
|
||||
|
||||
historyRetrieveTimer.start();
|
||||
Map<TimeRange, List<GridDataHistory>> destHistory = destGP
|
||||
.getGridHistory(historyOnlyList).getPayload();
|
||||
historyRetrieveTimer.stop();
|
||||
for (TimeRange tr : destHistory.keySet()) {
|
||||
List<GridDataHistory> srcHistList = histories.get(tr);
|
||||
List<GridDataHistory> destHistList = destHistory.get(tr);
|
||||
for (int i = 0; i < srcHistList.size(); i++) {
|
||||
destHistList.get(i).replaceValues(srcHistList.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
// only need to update the publish time on the destination
|
||||
// histories of grids that are not being saved (due to no
|
||||
// changes), because the saveGridSlices() call below will update
|
||||
// the publish time of the ones with changes
|
||||
historyUpdateTimer.start();
|
||||
destGP.updatePublishTime(destHistory.values(),
|
||||
(Date) nowTime.clone());
|
||||
historyUpdateTimer.stop();
|
||||
|
||||
// save data directly to the official database (bypassing
|
||||
// the checks in Parm intentionally)
|
||||
storeTimer.start();
|
||||
ssr.addMessages(officialDBPtr.saveGridSlices(destParmId,
|
||||
publishTime, sourceData, requestorId, historyOnlyList));
|
||||
storeTimer.stop();
|
||||
|
||||
// System.out.println("Published " + sourceData.size() +
|
||||
// " slices");
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("SaveGridData for official for commitGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
continue;
|
||||
}
|
||||
|
||||
// make the notification
|
||||
GridUpdateNotification not = new GridUpdateNotification(
|
||||
destParmId, publishTime, histories, requestorId, siteID);
|
||||
changes.add(not);
|
||||
sr.getPayload().add(not);
|
||||
|
||||
} finally {
|
||||
ClusterLockUtils.unlock(ct, false);
|
||||
}
|
||||
|
||||
// only need to update the publish time on the destination histories
|
||||
// of grids that are not being saved (due to no changes), because
|
||||
// the saveGridSlices() call below will update the publish time
|
||||
// of the ones with changes
|
||||
historyUpdateTimer.start();
|
||||
destGP.updatePublishTime(destHistory.values(),
|
||||
(Date) nowTime.clone());
|
||||
historyUpdateTimer.stop();
|
||||
|
||||
// save data directly to the official database (bypassing
|
||||
// the checks in Parm intentionally)
|
||||
storeTimer.start();
|
||||
ssr.addMessages(officialDBPtr.saveGridSlices(destParmId,
|
||||
publishTime, sourceData, requestorId, historyOnlyList));
|
||||
storeTimer.stop();
|
||||
// System.out.println("Published " + sourceData.size() + " slices");
|
||||
if (!ssr.isOkay()) {
|
||||
ssr.addMessage("SaveGridData for official for commitGrid() failure: "
|
||||
+ ssr.message());
|
||||
srDetailed.addMessages(ssr);
|
||||
failures.add(req);
|
||||
continue;
|
||||
}
|
||||
|
||||
// make the notification
|
||||
GridUpdateNotification not = new GridUpdateNotification(destParmId,
|
||||
publishTime, histories, requestorId, siteID);
|
||||
changes.add(not);
|
||||
sr.getPayload().add(not);
|
||||
}
|
||||
|
||||
perfLog.logDuration("Publish Grids: Acquiring cluster lock",
|
||||
lockTimer.getElapsedTime());
|
||||
perfLog.logDuration("Publish Grids: Retrieving inventories",
|
||||
inventoryTimer.getElapsedTime());
|
||||
perfLog.logDuration("Publish Grids: Retrieving histories",
|
||||
|
@ -1159,7 +1202,7 @@ public class GridParmManager {
|
|||
SmartInitQueue queue = SmartInitQueue.getQueue();
|
||||
if (queue != null) {
|
||||
// acquire cluster lock since only needs to happen once
|
||||
ClusterTask ct = ClusterLockUtils.lookupLock(TASK_NAME,
|
||||
ClusterTask ct = ClusterLockUtils.lookupLock(SMART_INIT_TASK_NAME,
|
||||
SMART_INIT_TASK_DETAILS + siteID);
|
||||
|
||||
// TODO: reconsider this as changes to localConfig may change what
|
||||
|
@ -1167,8 +1210,9 @@ public class GridParmManager {
|
|||
// TODO: re-enable check
|
||||
// if ((ct.getLastExecution() + SMART_INIT_TIMEOUT) < System
|
||||
// .currentTimeMillis()) {
|
||||
ct = ClusterLockUtils.lock(TASK_NAME, SMART_INIT_TASK_DETAILS
|
||||
+ siteID, SMART_INIT_TIMEOUT, false);
|
||||
ct = ClusterLockUtils
|
||||
.lock(SMART_INIT_TASK_NAME, SMART_INIT_TASK_DETAILS
|
||||
+ siteID, SMART_INIT_TIMEOUT, false);
|
||||
if (LockState.SUCCESSFUL.equals(ct.getLockState())) {
|
||||
boolean clearTime = false;
|
||||
try {
|
||||
|
|
Loading…
Add table
Reference in a new issue