Issue #1570 add concurrent python functionality, address some comments

Change-Id: I9d7f5491f0895e252d04979458004e64e39f6805

Former-commit-id: 02a6471f1407593fcf39a724eb19bb4a7bc22626
This commit is contained in:
Matt Nash 2013-02-06 12:34:27 -06:00
parent e993b72a39
commit 53f58684a0
12 changed files with 557 additions and 0 deletions

View file

@ -326,4 +326,11 @@
version="0.0.0"
unpack="false"/>
<plugin
id="com.raytheon.uf.common.python.concurrent"
download-size="0"
install-size="0"
version="0.0.0"
unpack="false"/>
</feature>

View file

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
<classpathentry kind="src" path="src"/>
<classpathentry kind="output" path="bin"/>
</classpath>

View file

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>com.raytheon.uf.common.python.concurrent</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.pde.ManifestBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.pde.SchemaBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.pde.PluginNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>

View file

@ -0,0 +1,7 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.6

View file

@ -0,0 +1,9 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Concurrent
Bundle-SymbolicName: com.raytheon.uf.common.python.concurrent
Bundle-Version: 1.0.0.qualifier
Bundle-Vendor: RAYTHEON
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Require-Bundle: com.raytheon.uf.common.python;bundle-version="1.12.1174"
Export-Package: com.raytheon.uf.common.python.concurrent

View file

@ -0,0 +1,4 @@
source.. = src/
output.. = bin/
bin.includes = META-INF/,\
.

View file

@ -0,0 +1,75 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.python.concurrent;
import com.raytheon.uf.common.python.PythonInterpreter;
/**
* Must be extended by all classes that want to run Python on different threads.
* Defines the thread pool, gets the {@link PythonInterpreter}, and tells how to
* execute.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 5, 2013 mnash Initial creation
*
* </pre>
*
* @author mnash
* @version 1.0
*/
public abstract class AbstractPythonScriptFactory<P extends PythonInterpreter> {
private final int maxThreads;
private final String name;
/**
* This method will be called on the Python thread and will instantiate the
* PythonInterpreter that is going to be used.
*
* @return
*/
public abstract P createPythonScript();
public AbstractPythonScriptFactory(String name, int maxThreads) {
this.name = name;
this.maxThreads = maxThreads;
}
/**
* @return the maxThreads
*/
public final int getMaxThreads() {
return maxThreads;
}
/**
* @return the name
*/
public final String getName() {
return name;
}
}

View file

@ -0,0 +1,51 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.python.concurrent;
import com.raytheon.uf.common.python.PythonInterpreter;
/**
* Any class that implements this will be able to execute methods on the
* PythonInterpreter that is passed in
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Feb 5, 2013 mnash Initial creation
*
* </pre>
*
* @author mnash
* @version 1.0
*/
public interface IPythonExecutor<P extends PythonInterpreter, R extends Object> {
/**
* Method takes a {@link PythonInterpreter} and executes the necessary parts
* of it
*
* @param script
* @return
*/
public abstract R execute(P script);
}

View file

@ -0,0 +1,56 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.python.concurrent;
import java.util.EventListener;
/**
* Subclasses can create their own instance of this class to specify what to do
* when the {@link PythonJob} is finished
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 30, 2013 mnash Initial creation
*
* </pre>
*
* @author mnash
* @version 1.0
*/
public interface IPythonJobListener<R extends Object> extends EventListener {
/**
* When a job has finished successfully this will get fired.
*
* @param result
*/
public void jobFinished(R result);
/**
* When a job has finished with exceptions, this will get fired.
*
* @param e
*/
public void jobFailed(Throwable e);
}

View file

@ -0,0 +1,79 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.python.concurrent;
import java.util.concurrent.Callable;
import com.raytheon.uf.common.python.PythonInterpreter;
/**
* Task that will invoke methods on a {@link IPythonExecutor} subclass
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 31, 2013 mnash Initial creation
*
* </pre>
*
* @author mnash
* @version 1.0
*/
public class PythonJob<P extends PythonInterpreter, R extends Object>
implements Callable<R> {
/*
* When code creates a new PythonJob, this gets set to the instance of the
* ThreadLocal that corresponds with whichever thread the application is
* running on
*/
private ThreadLocal<P> threadPython = null;
private IPythonJobListener<R> listener;
private IPythonExecutor<P, R> executor;
public PythonJob(IPythonExecutor<P, R> executor,
IPythonJobListener<R> listener, ThreadLocal<P> threadLocal,
Object... args) {
this.listener = listener;
this.threadPython = threadLocal;
this.executor = executor;
}
@Override
public R call() {
P script = threadPython.get();
R result = null;
try {
result = executor.execute(script);
} catch (Throwable t) {
listener.jobFailed(t);
return null;
}
// fire listener to alert the original caller that we are done
listener.jobFinished(result);
return result;
}
}

View file

@ -0,0 +1,163 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.python.concurrent;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.raytheon.uf.common.python.PythonInterpreter;
/**
* Interface to get to the {@link ExecutorService}. Allows multiple thread pools
* to be created in a single JVM, by passing in a different application name.
*
* This class will be used in this way:
*
*
* <pre>
*
* AbstractPythonScriptFactory<PythonInterpreter, Object> factory = new CAVEPythonFactory();
* PythonJobCoordinator coordinator = PythonJobCoordinator
* .newInstance(factory);
* IPythonExecutor<PythonInterpreter, Object> executor = new CAVEExecutor(
* args);
* try {
* coordinator.submitJob(executor, listener);
* } catch (Exception e) {
* e.printStackTrace();
* }
*
* }
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 31, 2013 mnash Initial creation
*
* </pre>
*
* @author mnash
* @version 1.0
*/
public class PythonJobCoordinator<P extends PythonInterpreter> {
private ExecutorService execService = null;
private ThreadLocal<P> threadLocal = null;
private static Map<String, PythonJobCoordinator<? extends PythonInterpreter>> pools = new ConcurrentHashMap<String, PythonJobCoordinator<? extends PythonInterpreter>>();
private PythonJobCoordinator(final AbstractPythonScriptFactory<P> factory) {
execService = Executors.newFixedThreadPool(factory.getMaxThreads(),
new PythonThreadFactory(factory.getName()));
threadLocal = new ThreadLocal<P>() {
protected P initialValue() {
return factory.createPythonScript();
};
};
}
/**
* Gets the instance by name, or throw a {@link RuntimeException}.
*
* @param name
* @return
*/
public static PythonJobCoordinator<? extends PythonInterpreter> getInstance(
String name) {
synchronized (pools) {
if (pools.containsKey(name)) {
return pools.get(name);
} else {
throw new RuntimeException(
"Unable to find instance of PythonJobCoordinator named "
+ name
+ ", please call newInstance(AbstractPythonScriptFactory)");
}
}
}
/**
* Creates a new instance of this class for a new application. If the same
* name already exists, it assumes that it is the same application and
* returns the existing instance.
*
* @param name
* @param numThreads
* @return
*/
public static <S extends PythonInterpreter> PythonJobCoordinator<S> newInstance(
AbstractPythonScriptFactory<S> factory) {
synchronized (pools) {
if (pools.containsKey(factory.getName())) {
return (PythonJobCoordinator<S>) pools.get(factory.getName());
} else {
PythonJobCoordinator<S> pool = new PythonJobCoordinator<S>(
factory);
pools.put(factory.getName(), pool);
return pool;
}
}
}
/**
* Submits a job to the {@link ExecutorService}.
*
* @param callable
* @return
* @throws Exception
*/
public <R> void submitJob(IPythonExecutor<P, R> executor,
IPythonJobListener<R> listener, Object... args) throws Exception {
// submit job
PythonJob<P, R> job = new PythonJob<P, R>(executor, listener,
threadLocal, args);
execService.submit(job);
}
/**
* This function should take the {@link PythonInterpreter} on each thread in
* the thread pool and dispose of it and then shutdown the
* {@link ExecutorService}
*
* @param name
*/
public void shutdownCoordinator(String name) {
/*
* TODO need to add for future functionality
*/
}
/**
* This function should cancel any listeners for a certain task and then
* remove those corresponding tasks off of the queue to be ran. It should
* NOT try to cancel any running python interpreters.
*
* @param name
*/
public void shutdownTask(String name) {
/*
* TODO need to add for future functionality
*/
}
}

View file

@ -0,0 +1,71 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.python.concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Creates new threads named according to what python task they were created
* for. Based nearly identically off of {@link ThreadFactory}
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 31, 2013 mnash Initial creation
*
* </pre>
*
* @author mnash
* @version 1.0
*/
public class PythonThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public PythonThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
namePrefix = name.toLowerCase() + "-pool-"
+ poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix
+ threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}