Merge "Omaha #2841 Separate queuing of text formatters and product scripts." into omaha_14.4.1

Former-commit-id: e13f80b81df762d6658cce9ad534dc499791ba46
This commit is contained in:
Ron Anderson 2014-05-29 18:50:52 -05:00 committed by Gerrit Code Review
commit 7df866b471
6 changed files with 172 additions and 135 deletions

View file

@ -30,6 +30,8 @@
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 11/20/2013 2488 randerso Changed to use DejaVu fonts
# 05/28/2014 2841 randerso Added separate configurable limits for
# text formatter and product script tasks
GFESUITE_HOME = "/awips2/GFESuite"
GFESUITE_PRDDIR = "/tmp/products"
@ -413,16 +415,27 @@ ProductOutputDialog_nonWrapPils = ['AFM','PFM','FWF','SFT','WCN','FWS']
# Process Monitor Options
#------------------------------------------------------------------------
#
# The maximum number of pending tasks to queue.
#ProcessMonitorMaxPendingTasks = 20
# The maximum number of pending product scripts to queue.
#ProcessMonitorMaxPendingScripts = 10
# The maximum number of finished tasks to keep around (so you can
# The maximum number of finished product scripts to keep around (so you can
# see their output).
#ProcessMonitorMaxOldTasks = 10
#ProcessMonitorMaxOldScripts = 5
# The maximum number of Tasks to run at one time (user can still
# start more via the TaskMonitorDialog).
#ProcessMonitorMaxTasks = 1
# The maximum number of product scripts to run at one time (user can still
# start more via the ProcessMonitorDialog).
#ProcessMonitorMaxScripts = 1
# The maximum number of pending text formatters to queue.
#ProcessMonitorMaxPendingFormatters = 10
# The maximum number of finished text formatters to keep around (so you can
# see their output).
#ProcessMonitorMaxOldFormatters = 5
# The maximum number of text formatters to run at one time (user can still
# start more via the ProcessMonitorDialog).
#ProcessMonitorMaxFormatters = 1
#------------------------------------------------------------------------
# Sample and Legend Colors, Sample Shadows

View file

@ -35,8 +35,6 @@ import java.util.TimeZone;
import org.eclipse.jface.dialogs.IDialogConstants;
import org.eclipse.swt.SWT;
import org.eclipse.swt.custom.ScrolledComposite;
import org.eclipse.swt.events.ControlAdapter;
import org.eclipse.swt.events.ControlEvent;
import org.eclipse.swt.events.SelectionAdapter;
import org.eclipse.swt.events.SelectionEvent;
import org.eclipse.swt.graphics.GC;
@ -71,6 +69,7 @@ import com.raytheon.viz.ui.dialogs.CaveJFACEDialog;
* ------------ ---------- ----------- --------------------------
* Mar 7, 2008 Eric Babin Initial Creation
* 02/12/2013 #1597 randerso Modified TaskOutputDialog to support GFE Performance metrics
* 05/28/2014 #2841 randerso Fixed some NPEs and layout issues
*
* </pre>
*
@ -221,20 +220,10 @@ public class ProcessMonitorDialog extends CaveJFACEDialog implements
layout.horizontalSpacing = 0;
layout.verticalSpacing = 0;
comp.setLayout(layout);
layoutData = new GridData(300, 200);
layoutData = new GridData(300, 120);
comp.setLayoutData(layoutData);
scrolled.setContent(comp);
scrolled.setExpandVertical(true);
scrolled.setMinSize(comp.getSize());
scrolled.addControlListener(new ControlAdapter() {
@Override
public void controlResized(ControlEvent e) {
scrolled.setMinSize(comp.computeSize(SWT.DEFAULT, SWT.DEFAULT));
}
});
scrolled.layout();
return comp;
}
@ -250,8 +239,8 @@ public class ProcessMonitorDialog extends CaveJFACEDialog implements
java.util.List<AbstractGfeTask> taskList = TaskManager.getInstance()
.getTaskList();
for (AbstractGfeTask task : taskList) {
if (task.getStatus() == TaskStatus.FINISHED
|| task.getStatus() == TaskStatus.CANCELED) {
if ((task.getStatus() == TaskStatus.FINISHED)
|| (task.getStatus() == TaskStatus.CANCELED)) {
finished.add(task);
} else {
pending.add(task);
@ -344,10 +333,6 @@ public class ProcessMonitorDialog extends CaveJFACEDialog implements
}
pendingComp.pack();
pendingComp.layout();
ScrolledComposite scrolled = (ScrolledComposite) pendingComp
.getParent();
scrolled.setMinSize(finishedComp.getSize());
scrolled.layout();
// create the finished controls
for (AbstractGfeTask task : finished) {
@ -377,9 +362,6 @@ public class ProcessMonitorDialog extends CaveJFACEDialog implements
}
finishedComp.pack();
finishedComp.layout();
scrolled = (ScrolledComposite) finishedComp.getParent();
scrolled.setMinSize(finishedComp.getSize());
scrolled.layout();
}
private void displayLog(AbstractGfeTask task) {
@ -493,8 +475,11 @@ public class ProcessMonitorDialog extends CaveJFACEDialog implements
label.setText("Elapsed:");
label.setLayoutData(layoutData);
long delta = task.getFinishedTime().getTime()
- task.getStartedTime().getTime();
long delta = 0;
if (task.getStartedTime() != null) {
delta = task.getFinishedTime().getTime()
- task.getStartedTime().getTime();
}
Text elapsedText = new Text(elapsedComp, SWT.BORDER
| SWT.READ_ONLY);
elapsedText.setText(delta + " ms");
@ -570,7 +555,7 @@ public class ProcessMonitorDialog extends CaveJFACEDialog implements
InputStreamReader in = null;
try {
File file = task.getLogFile();
if (file != null && file.exists()) {
if ((file != null) && file.exists()) {
in = new InputStreamReader(new FileInputStream(file),
"UTF-8");
int n = (int) file.length();

View file

@ -36,7 +36,7 @@ import com.raytheon.uf.common.time.SimulatedTime;
import com.raytheon.uf.common.util.FileUtil;
/**
* TODO Add Description
* Base class for GFE tasks
*
* <pre>
*
@ -44,7 +44,8 @@ import com.raytheon.uf.common.util.FileUtil;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 11, 2011 randerso Initial creation
* Apr 11, 2011 randerso Initial creation
* May 28, 2014 #2841 randerso Made TaskScheduler generic
*
* </pre>
*
@ -75,7 +76,7 @@ public abstract class AbstractGfeTask extends Thread implements
private File logFile;
private TaskScheduler scheduler;
private TaskScheduler<AbstractGfeTask> scheduler;
protected AbstractGfeTask(String displayName) {
this.displayName = displayName;

View file

@ -31,7 +31,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.SimulatedTime;
/**
* TODO Add Description
* GFE Product Script Task
*
* <pre>
*
@ -40,8 +40,8 @@ import com.raytheon.uf.common.time.SimulatedTime;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 7, 2011 randerso Initial creation
* Sep 19,2011 10955 rferrel make sure process destroy is called.
*
* Sep 19,2011 10955 rferrel make sure process destroy is called.
* May 28,2014 #2841 randerso Fix null pointer when script is cancelled.
*
* </pre>
*
@ -53,8 +53,6 @@ public class ScriptTask extends AbstractGfeTask {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(ScriptTask.class);
private static String scriptsBaseDir;
private String command;
private ProcessBuilder processBuilder;
@ -74,33 +72,11 @@ public class ScriptTask extends AbstractGfeTask {
@Override
public synchronized void start() {
ProcessBuilder pb = new ProcessBuilder("sh", "-c", getCommand());
// File dir = new File(getBaseDir()).getAbsoluteFile();
// pb.directory(dir);
pb.redirectErrorStream(true);
setProcessBuilder(pb);
super.start();
}
// private String getBaseDir() {
// if (scriptsBaseDir == null) {
// IPathManager pathMgr = PathManagerFactory.getPathManager();
// LocalizationContext caveStaticBase = pathMgr.getContext(
// LocalizationType.CAVE_STATIC, LocalizationLevel.BASE);
// try {
// LocalizationFile lf = pathMgr.getLocalizationFile(
// caveStaticBase, "abc");
// scriptsBaseDir = lf.getFile(false).getParentFile()
// .getAbsolutePath();
//
// } catch (LocalizationException e) {
// statusHandler.handle(Priority.PROBLEM, e.getLocalizedMessage(),
// e);
//
// }
// }
// return scriptsBaseDir;
// }
/*
* (non-Javadoc)
*
@ -123,14 +99,22 @@ public class ScriptTask extends AbstractGfeTask {
log.flush();
}
int exitVal = process.waitFor();
log.write("Exited with error code " + exitVal + "\n");
StringBuilder msg = new StringBuilder("Job ");
msg.append(getDisplayName());
Priority priority = Priority.INFO;
if (exitVal != 0) {
priority = Priority.PROBLEM;
if (process != null) {
int exitVal = process.waitFor();
log.write("Exited with error code " + exitVal + "\n");
if (exitVal != 0) {
priority = Priority.PROBLEM;
}
msg.append(" exited with code: ").append(exitVal);
} else {
msg.append(" was cancelled.");
}
statusHandler.handle(priority, "Job " + getDisplayName()
+ " exited with code: " + exitVal);
statusHandler.handle(priority, msg.toString());
msg.append("\n");
log.write(msg.toString());
} catch (Exception e) {
statusHandler.handle(
Priority.PROBLEM,

View file

@ -19,12 +19,17 @@
**/
package com.raytheon.viz.gfe.tasks;
import java.util.ArrayList;
import java.util.List;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.viz.gfe.Activator;
import com.raytheon.viz.gfe.PythonPreferenceStore;
import com.raytheon.viz.gfe.textformatter.TextFormatter;
/**
* TODO Add Description
* GFE Task Manager
*
* <pre>
*
@ -33,6 +38,9 @@ import com.raytheon.viz.gfe.textformatter.TextFormatter;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Apr 7, 2011 randerso Initial creation
* May 28, 2014 #2841 randerso Separated product script and formatters into
* separate schedulers.
*
*
* </pre>
*
@ -41,41 +49,105 @@ import com.raytheon.viz.gfe.textformatter.TextFormatter;
*/
public class TaskManager {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(TaskManager.class);
private static TaskManager instance = new TaskManager();
public static TaskManager getInstance() {
return instance;
}
private TaskScheduler taskScheduler;
private TaskScheduler<ScriptTask> scriptScheduler;
private TaskScheduler<TextFormatter> formatterScheduler;
private TaskManager() {
taskScheduler = new TaskScheduler();
PythonPreferenceStore prefs = Activator.getDefault()
.getPreferenceStore();
int pendingScriptLimit = 10;
if (prefs.contains("ProcessMonitorMaxPendingScripts")) {
pendingScriptLimit = prefs
.getInt("ProcessMonitorMaxPendingScripts");
}
int runningScriptLimit = 1;
if (prefs.contains("ProcessMonitorMaxScripts")) {
runningScriptLimit = prefs.getInt("ProcessMonitorMaxScripts");
}
int finishedScriptLimit = 5;
if (prefs.contains("ProcessMonitorMaxOldScripts")) {
finishedScriptLimit = prefs.getInt("ProcessMonitorMaxOldScripts");
}
scriptScheduler = new TaskScheduler<ScriptTask>(pendingScriptLimit,
runningScriptLimit, finishedScriptLimit);
int pendingFormatterLimit = 10;
if (prefs.contains("ProcessMonitorMaxPendingFormatters")) {
pendingFormatterLimit = prefs
.getInt("ProcessMonitorMaxPendingFormatters");
}
int runningFormatterLimit = 1;
if (prefs.contains("ProcessMonitorMaxFormatters")) {
runningFormatterLimit = prefs.getInt("ProcessMonitorMaxFormatters");
}
int finishedFormatterLimit = 5;
if (prefs.contains("ProcessMonitorMaxOldFormatters")) {
finishedFormatterLimit = prefs
.getInt("ProcessMonitorMaxOldFormatters");
}
formatterScheduler = new TaskScheduler<TextFormatter>(
pendingFormatterLimit, runningFormatterLimit,
finishedFormatterLimit);
}
public void createScriptTask(String name, String cmd) {
ScriptTask task = new ScriptTask(name, cmd);
taskScheduler.queueTask(task);
scriptScheduler.queueTask(task);
}
public void queueFormatter(TextFormatter formatter) {
taskScheduler.queueTask(formatter);
formatterScheduler.queueTask(formatter);
}
public void forceRunTask(AbstractGfeTask task) {
taskScheduler.forceRunTask(task);
if (task instanceof ScriptTask) {
scriptScheduler.forceRunTask((ScriptTask) task);
} else if (task instanceof TextFormatter) {
formatterScheduler.forceRunTask((TextFormatter) task);
} else {
statusHandler.error("Unknown task type: "
+ task.getClass().getName());
}
}
public void cancelTask(AbstractGfeTask task) {
taskScheduler.cancelTask(task);
if (task instanceof ScriptTask) {
scriptScheduler.cancelTask((ScriptTask) task);
} else if (task instanceof TextFormatter) {
formatterScheduler.cancelTask((TextFormatter) task);
} else {
statusHandler.error("Unknown task type: "
+ task.getClass().getName());
}
}
public List<AbstractGfeTask> getTaskList() {
return taskScheduler.getTaskList();
List<AbstractGfeTask> scriptTasks = scriptScheduler.getTaskList();
List<AbstractGfeTask> formatterTasks = formatterScheduler.getTaskList();
List<AbstractGfeTask> allTasks = new ArrayList<AbstractGfeTask>(
scriptTasks.size() + formatterTasks.size());
allTasks.addAll(scriptTasks);
allTasks.addAll(formatterTasks);
return allTasks;
}
public AbstractGfeTask getTask(String name) {
List<AbstractGfeTask> taskList = taskScheduler.getTaskList();
List<AbstractGfeTask> taskList = this.getTaskList();
for (int i = taskList.size() - 1; i >= 0; i--) {
AbstractGfeTask task = taskList.get(i);
if (name.equals(task.getDisplayName())) {
@ -86,11 +158,13 @@ public class TaskManager {
}
public void addTaskStatusChangedListener(ITaskStatusChangedListener listener) {
taskScheduler.addTaskStatusChangedListener(listener);
scriptScheduler.addTaskStatusChangedListener(listener);
formatterScheduler.addTaskStatusChangedListener(listener);
}
public void removeTaskStatusChangedListener(
ITaskStatusChangedListener listener) {
taskScheduler.removeTaskStatusChangedListener(listener);
scriptScheduler.removeTaskStatusChangedListener(listener);
formatterScheduler.removeTaskStatusChangedListener(listener);
}
}

View file

@ -19,10 +19,11 @@
**/
package com.raytheon.viz.gfe.tasks;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
@ -32,13 +33,10 @@ import org.eclipse.core.runtime.jobs.Job;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.viz.gfe.Activator;
import com.raytheon.viz.gfe.PythonPreferenceStore;
import com.raytheon.viz.gfe.tasks.AbstractGfeTask.TaskStatus;
/**
* TODO Add Description
* GFE Task Scheduler
*
* <pre>
*
@ -48,6 +46,8 @@ import com.raytheon.viz.gfe.tasks.AbstractGfeTask.TaskStatus;
* ------------ ---------- ----------- --------------------------
* Apr 07, 2011 randerso Initial creation
* Mar 03, 2012 #346 dgilling Use identity-based ListenerLists.
* May 28, 2014 #2841 randerso Made scheduler generic so multiple instances
* can be created to process different script types
*
* </pre>
*
@ -55,40 +55,32 @@ import com.raytheon.viz.gfe.tasks.AbstractGfeTask.TaskStatus;
* @version 1.0
*/
public class TaskScheduler extends Job {
public class TaskScheduler<TaskType extends AbstractGfeTask> extends Job {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(TaskScheduler.class);
private List<AbstractGfeTask> allTasks;
private ArrayBlockingQueue<AbstractGfeTask> pendingTasks;
private BlockingQueue<AbstractGfeTask> pendingTasks;
private ArrayBlockingQueue<AbstractGfeTask> finishedTasks;
private BlockingQueue<AbstractGfeTask> finishedTasks;
private int runningTasks, runningTaskLimit;
private int pendingTaskLimit;
private int runningTaskLimit;
private int runningTasks;
private ListenerList listeners;
protected TaskScheduler() {
protected TaskScheduler(int pendingTaskLimit, int runningTaskLimit,
int finishedTaskLimit) {
super("Task Manager Job");
this.runningTaskLimit = runningTaskLimit;
this.pendingTaskLimit = pendingTaskLimit;
listeners = new ListenerList(ListenerList.IDENTITY);
PythonPreferenceStore prefs = Activator.getDefault()
.getPreferenceStore();
int pendingTaskLimit = 20;
if (prefs.contains("ProcessMonitorMaxPendingTasks")) {
pendingTaskLimit = prefs.getInt("ProcessMonitorMaxPendingTasks");
}
runningTaskLimit = 1;
if (prefs.contains("ProcessMonitorMaxTasks")) {
runningTaskLimit = prefs.getInt("ProcessMonitorMaxTasks");
}
int finishedTaskLimit = 10;
if (prefs.contains("ProcessMonitorMaxOldTasks")) {
finishedTaskLimit = prefs.getInt("ProcessMonitorMaxOldTasks");
}
pendingTasks = new ArrayBlockingQueue<AbstractGfeTask>(pendingTaskLimit);
runningTasks = 0;
finishedTasks = new ArrayBlockingQueue<AbstractGfeTask>(
@ -98,53 +90,38 @@ public class TaskScheduler extends Job {
}
protected void queueTask(AbstractGfeTask task) {
allTasks.add(task);
protected void queueTask(TaskType task) {
task.setScheduler(this);
try {
pendingTasks.add(task);
allTasks.add(task);
fireTaskStatusChanged(task);
} catch (IllegalStateException e) {
statusHandler.handle(Priority.PROBLEM, "Unable to queue job "
+ task.getDisplayName(), e);
statusHandler.error(String.format(
"Unable to queue job: %s. Pending task limit (%d) exceeded."
+ task.getDisplayName(), pendingTaskLimit), e);
}
this.schedule();
fireTaskStatusChanged(task);
}
protected synchronized void forceRunTask(AbstractGfeTask task) {
protected synchronized void forceRunTask(TaskType task) {
if (pendingTasks.remove(task)) {
runTask(task);
}
}
protected synchronized void cancelTask(AbstractGfeTask task) {
protected synchronized void cancelTask(TaskType task) {
// remove from pending list in case it's still pending
// will do nothing if it's not in the pending list anymore
pendingTasks.remove(task);
task.cancel();
}
protected synchronized void runTask(AbstractGfeTask task) {
private synchronized void runTask(AbstractGfeTask task) {
System.out.println("runTask");
// try {
// String[] cmdArray = task.getCommand().split(" ");
// File f = new File(scriptsBaseDir + File.separator + cmdArray[0]);
// String dir = f.getParentFile().getAbsolutePath();
// cmdArray[0] = f.getAbsolutePath();
// ProcessBuilder pb = new ProcessBuilder(cmdArray);
// pb.directory(new File(dir));
// pb.redirectErrorStream(true);
// task.setLogFile(File.createTempFile("gfe_", ".log", logBaseDir));
// task.setProcessBuilder(pb);
runningTasks++;
task.start();
// } catch (IOException e) {
// UFStatus.handle(Priority.PROBLEM, Activator.PLUGIN_ID,
// StatusConstants.CATEGORY_GFE, null,
// "Error creating log file for job " + task.getDisplayName()
// + ": " + e.getLocalizedMessage(), e);
// }
this.schedule();
fireTaskStatusChanged(task);
}
@ -159,7 +136,7 @@ public class TaskScheduler extends Job {
protected IStatus run(IProgressMonitor monitor) {
System.out.println("pending: " + pendingTasks.size() + " running: "
+ runningTasks);
while (runningTasks < runningTaskLimit && pendingTasks.size() > 0) {
while ((runningTasks < runningTaskLimit) && (pendingTasks.size() > 0)) {
AbstractGfeTask task = pendingTasks.poll();
runTask(task);
}
@ -167,23 +144,26 @@ public class TaskScheduler extends Job {
return Status.OK_STATUS;
}
protected synchronized void taskCompleted(AbstractGfeTask task) {
protected synchronized void taskCompleted(TaskType task) {
System.out.println("taskCompleted");
runningTasks--;
finishTask(task);
}
protected synchronized void taskCanceled(AbstractGfeTask task) {
protected synchronized void taskCanceled(TaskType task) {
task.status = TaskStatus.CANCELED;
System.out.println("taskCompleted");
finishTask(task);
}
private void finishTask(AbstractGfeTask task) {
private void finishTask(TaskType task) {
if (finishedTasks.remainingCapacity() == 0) {
AbstractGfeTask t = finishedTasks.poll();
allTasks.remove(t);
t.getLogFile().delete();
File logFile = t.getLogFile();
if ((logFile != null) && logFile.exists()) {
logFile.delete();
}
}
finishedTasks.add(task);
@ -198,7 +178,7 @@ public class TaskScheduler extends Job {
}
protected List<AbstractGfeTask> getTaskList() {
return Collections.unmodifiableList(allTasks);
return allTasks;
}
protected void addTaskStatusChangedListener(