Issue #1759: Refactor PurgeGfeGridsRequestHandler and IscMakeRequestHandler to not use

GfeScriptExecutor and GfeScript.

Change-Id: If3c789549a071e54f9020433f6221e688c51c44a

Conflicts:

	edexOsgi/com.raytheon.edex.plugin.gfe/META-INF/MANIFEST.MF


Former-commit-id: 055c36ea41 [formerly 45846ffc137ef816dc149f3f9e46b99dbe608897]
Former-commit-id: 0969f85246
This commit is contained in:
David Gillingham 2013-03-07 15:49:53 -06:00 committed by Steve Harris
parent fdf99a56b2
commit 8859674c70
9 changed files with 311 additions and 194 deletions

View file

@ -27,6 +27,7 @@ Require-Bundle: com.raytheon.uf.common.dataplugin.gfe;bundle-version="1.12.1174"
com.raytheon.uf.common.parameter;bundle-version="1.0.0",
com.raytheon.uf.common.dataplugin.grid;bundle-version="1.0.0",
com.google.guava;bundle-version="1.0.0",
com.raytheon.uf.common.util;bundle-version="1.12.1174",
org.apache.commons.lang;bundle-version="2.3.0"
Export-Package: com.raytheon.edex.plugin.gfe,
com.raytheon.edex.plugin.gfe.config,

View file

@ -24,15 +24,30 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import jep.JepException;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfigManager;
import com.raytheon.edex.plugin.gfe.exception.GfeConfigurationException;
import com.raytheon.edex.plugin.gfe.isc.GfeScript;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.GridLocation;
import com.raytheon.uf.common.dataplugin.gfe.python.GfePyIncludeUtil;
import com.raytheon.uf.common.dataplugin.gfe.request.IscMakeRequest;
import com.raytheon.uf.common.dataplugin.gfe.server.message.ServerResponse;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.python.PyUtil;
import com.raytheon.uf.common.python.PythonScript;
import com.raytheon.uf.common.serialization.comm.IRequestHandler;
import com.raytheon.uf.common.util.FileUtil;
/**
* Processes an ISC grid request from the client
@ -45,6 +60,7 @@ import com.raytheon.uf.common.serialization.comm.IRequestHandler;
* ------------ ---------- ----------- --------------------------
* 08/21/09 1995 bphillip Initial port
* 09/22/09 3058 rjpeter Converted to IRequestHandler
* 03/07/13 1759 dgilling Refactor to not use GfeScript.
* </pre>
*
* @author bphillip
@ -52,17 +68,21 @@ import com.raytheon.uf.common.serialization.comm.IRequestHandler;
*/
public class IscMakeRequestHandler implements IRequestHandler<IscMakeRequest> {
private static Map<String, GfeScript> gfeScripts;
static {
gfeScripts = new HashMap<String, GfeScript>();
}
private static final String SCRIPT_PATH = FileUtil.join(
GfePyIncludeUtil.ISC, "IrtServer.py");
private static final String METHOD_NAME = "makeISCrequest";
private static final ExecutorService scriptRunner = Executors
.newCachedThreadPool();
@Override
public ServerResponse<Object> handleRequest(IscMakeRequest request)
public ServerResponse<Boolean> handleRequest(IscMakeRequest request)
throws Exception {
ServerResponse<Object> response = new ServerResponse<Object>();
String siteID = request.getSiteID();
ServerResponse<Boolean> response = new ServerResponse<Boolean>();
response.setPayload(Boolean.FALSE);
final String siteID = request.getSiteID();
IFPServerConfig config = null;
try {
config = IFPServerConfigManager.getServerConfig(siteID);
@ -71,47 +91,89 @@ public class IscMakeRequestHandler implements IRequestHandler<IscMakeRequest> {
+ siteID + "]");
return response;
}
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext cx = pathMgr.getContext(
LocalizationType.EDEX_STATIC, LocalizationLevel.BASE);
final String scriptPath = pathMgr.getFile(cx, SCRIPT_PATH).getPath();
final String includePath = PyUtil.buildJepIncludePath(
GfePyIncludeUtil.getCommonPythonIncludePath(),
GfePyIncludeUtil.getIscScriptsIncludePath(),
GfePyIncludeUtil.getGfeConfigIncludePath(siteID));
GridLocation domain = config.dbDomain();
List<Integer> gridDims = new ArrayList<Integer>();
gridDims.add(domain.getNy());
gridDims.add(domain.getNx());
List<Double> gridBoundBox = new ArrayList<Double>();
gridBoundBox.add(domain.getOrigin().x);
gridBoundBox.add(domain.getOrigin().y);
gridBoundBox.add(domain.getExtent().x);
gridBoundBox.add(domain.getExtent().y);
final Map<String, Object> args = new HashMap<String, Object>();
args.put("xmlRequest", request.getXml());
args.put("gridDims", gridDims);
args.put("gridProj", domain.getProjection().getProjectionID()
.toString());
args.put("gridBoundBox", gridBoundBox);
args.put("mhs", config.getMhsid());
args.put("host", config.getServerHost());
args.put("port", config.getRpcPort());
args.put("protocol", String.valueOf(config.getProtocolVersion()));
args.put("site", siteID);
args.put("xmtScript", config.transmitScript());
Callable<String> scriptJob = new Callable<String>() {
@Override
public String call() throws Exception {
try {
PythonScript script = null;
try {
script = new PythonScript(scriptPath, includePath);
try {
script.execute(METHOD_NAME, args);
} catch (JepException e) {
String msg = "Error servicing IscMakeRequest from site ["
+ siteID + "]: " + e.getLocalizedMessage();
return msg;
}
} catch (JepException e) {
String msg = "Error initializing IrtServer python script: "
+ e.getLocalizedMessage();
return msg;
} finally {
if (script != null) {
script.dispose();
}
}
} catch (Throwable t) {
String msg = "Error servicing IscMakeRequest from site ["
+ siteID + "]: " + t.getLocalizedMessage();
return msg;
}
return null;
}
};
try {
GridLocation domain = config.dbDomain();
List<Integer> gridDims = new ArrayList<Integer>();
gridDims.add(domain.getNy());
gridDims.add(domain.getNx());
List<Double> gridBoundBox = new ArrayList<Double>();
gridBoundBox.add(domain.getOrigin().x);
gridBoundBox.add(domain.getOrigin().y);
gridBoundBox.add(domain.getExtent().x);
gridBoundBox.add(domain.getExtent().y);
Map<String, Object> args = new HashMap<String, Object>();
args.put("xmlRequest", request.getXml());
args.put("gridDims", gridDims);
args.put("gridProj", domain.getProjection().getProjectionID()
.toString());
args.put("gridBoundBox", gridBoundBox);
args.put("mhs", config.getMhsid());
args.put("host", config.getServerHost());
args.put("port", config.getRpcPort());
args.put("protocol", String.valueOf(config.getProtocolVersion()));
args.put("site", siteID);
args.put("xmtScript", config.transmitScript());
GfeScript script = null;
if (!gfeScripts.containsKey(siteID)) {
GfeScript newScript = new GfeScript("IrtServer", siteID);
newScript.start();
gfeScripts.put(siteID, newScript);
Future<String> result = scriptRunner.submit(scriptJob);
String errorMessage = result.get();
if (errorMessage != null) {
response.addMessage(errorMessage);
return response;
}
script = gfeScripts.get(siteID);
while(script.isRunning()){
Thread.sleep(100);
}
script.execute("makeISCrequest", args);
response.setPayload(script.waitFor());
} catch (Throwable e) {
e.printStackTrace();
} catch (RejectedExecutionException e) {
String msg = "IscMakeRequest job was rejected: "
+ e.getLocalizedMessage();
response.addMessage(msg);
return response;
}
response.setPayload(Boolean.TRUE);
return response;
}
}

View file

@ -19,10 +19,29 @@
**/
package com.raytheon.edex.plugin.gfe.server.handler;
import com.raytheon.edex.plugin.gfe.isc.GfeScriptExecutor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import jep.JepException;
import com.raytheon.uf.common.dataplugin.gfe.python.GfePyIncludeUtil;
import com.raytheon.uf.common.dataplugin.gfe.request.PurgeGfeGridsRequest;
import com.raytheon.uf.common.dataplugin.gfe.server.message.ServerResponse;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.python.PyUtil;
import com.raytheon.uf.common.python.PythonScript;
import com.raytheon.uf.common.serialization.comm.IRequestHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.site.SiteAwareRegistry;
/**
* Request handler for PurgeGfeGrids. Will execute the purgeAllGrids.py script
@ -34,6 +53,8 @@ import com.raytheon.uf.common.serialization.comm.IRequestHandler;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 23, 2010 dgilling Initial creation
* Mar 07, 2013 1759 dgilling Refactored to remove dependency
* on GfeScriptExecutor.
*
* </pre>
*
@ -44,20 +65,61 @@ import com.raytheon.uf.common.serialization.comm.IRequestHandler;
public class PurgeGfeGridsRequestHandler implements
IRequestHandler<PurgeGfeGridsRequest> {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(PurgeGfeGridsRequestHandler.class);
private static final String SCRIPT_PATH = FileUtil.join(
GfePyIncludeUtil.ISC, "purgeAllGrids.py");
private static final String METHOD_NAME = "executeFromJava";
@Override
public ServerResponse<String> handleRequest(PurgeGfeGridsRequest request)
public ServerResponse<Boolean> handleRequest(PurgeGfeGridsRequest request)
throws Exception {
ServerResponse<String> sr = new ServerResponse<String>();
GfeScriptExecutor scriptRunner = new GfeScriptExecutor();
ServerResponse<Boolean> sr = new ServerResponse<Boolean>();
sr.setPayload(Boolean.FALSE);
String retVal = scriptRunner.execute("purgeAllGrids "
+ request.getArgString());
if (!retVal.equals(GfeScriptExecutor.SUCCESS)) {
sr.addMessage(retVal);
List<String> siteList = Arrays.asList(SiteAwareRegistry.getInstance()
.getActiveSites());
if (!siteList.contains(request.getSiteID())) {
sr.addMessage("DatabaseID " + request.getDatabaseID()
+ " is unknown.");
return sr;
}
PythonScript script = null;
try {
IPathManager pathMgr = PathManagerFactory.getPathManager();
LocalizationContext cx = pathMgr.getContext(
LocalizationType.EDEX_STATIC, LocalizationLevel.BASE);
String scriptPath = pathMgr.getFile(cx, SCRIPT_PATH).getPath();
String includePath = PyUtil.buildJepIncludePath(
GfePyIncludeUtil.getCommonPythonIncludePath(),
GfePyIncludeUtil.getIscScriptsIncludePath());
script = new PythonScript(scriptPath, includePath);
try {
Map<String, Object> args = new HashMap<String, Object>();
args.put("databaseID", request.getDatabaseID().toString());
script.execute(METHOD_NAME, args);
} catch (JepException e) {
String msg = "Error purging data from DatabaseID ["
+ request.getDatabaseID() + "]";
statusHandler.handle(Priority.PROBLEM, msg, e);
sr.addMessage(msg + ": " + e.getLocalizedMessage());
return sr;
}
} catch (JepException e) {
String msg = "Error initializing purgeAllGrids python script";
statusHandler.handle(Priority.PROBLEM, msg, e);
sr.addMessage(msg + ": " + e.getLocalizedMessage());
return sr;
} finally {
if (script != null) {
script.dispose();
}
}
sr.setPayload(Boolean.TRUE);
return sr;
}
}

View file

@ -17,79 +17,25 @@
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
import string, sys, re, time, types, getopt, LogStream, fnmatch, os
import JUtil
import siteConfig
pytime = time
import sys
import LogStream
from com.raytheon.edex.plugin.gfe.smartinit import IFPDB
from com.raytheon.edex.plugin.gfe.server import GridParmManager
#--------------------------------------------------------------------------
# Main program purges all grids from a database.
#--------------------------------------------------------------------------
#--------------------------------------------------------------------------
# Prints usage statement.
#--------------------------------------------------------------------------
def usage():
ustr = """\
Usage: purgeAllGrids -h hostname -p rpcport -d databaseID
-h hostname: ifpServer host name
-p rpcport: rpc port number of ifpServer
-d databaseID: database identifier\n"""
sys.stderr.write(ustr)
#--------------------------------------------------------------------------
# Parses command line options and sets internal flags
#--------------------------------------------------------------------------
def getOpts(argv):
Options = {'host': None, 'port': None, 'databaseID': None, 'purgeAll':None}
try:
optl, args = getopt.getopt(argv[1:], "h:p:d:a:")
for opt in optl:
if opt[0] == '-h':
Options['host'] = opt[1]
elif opt[0] == '-p':
Options['port'] = int(opt[1])
elif opt[0] == '-d':
Options['databaseID'] = opt[1]
elif opt[0] == '-a':
Options['purgeAll'] = opt[1]
except:
usage()
if Options['host'] is None:
Options['host'] = siteConfig.GFESUITE_SERVER
if Options['port'] is None:
Options['port'] = siteConfig.GFESUITE_PORT
if Options['host'] is None or Options['port'] is None:
usage()
raise SyntaxError, "Command line error"
return Options
#--------------------------------------------------------------------------
# process function
#--------------------------------------------------------------------------
def process(opts):
if opts['purgeAll'] is None:
purgeDB(opts['databaseID'])
else:
dbList = JUtil.javaStringListToPylist(GridParmManager.getDbInventory(opts['purgeAll']).getPayload())
for db in dbList:
purgeDB(db)
def purgeDB(dbname):
# get list of parms
def process(dbname):
LogStream.logEvent("Purging all grids from: ", dbname)
# get list of parms
db = IFPDB(dbname)
parms = db.getKeys()
@ -100,13 +46,19 @@ def purgeDB(dbname):
inv = we.getKeys()
for i in range(0, inv.size()):
we.removeItem(inv.get(i))
#--------------------------------------------------------------------------
# Main program
#--------------------------------------------------------------------------
def main(argv):
argv = JUtil.javaStringListToPylist(argv)
Options = getOpts(argv)
def executeFromJava(databaseID):
LogStream.logEvent("PurgeAllGrids starting")
process(Options)
LogStream.logEvent("PurgeAllGrids finished")
try:
process(databaseID)
LogStream.logEvent("PurgeAllGrids finished")
sys.exit(0)
except SystemExit:
pass
except:
LogStream.logProblem("Caught exception\n", LogStream.exc())

View file

@ -19,6 +19,7 @@
**/
package com.raytheon.uf.common.dataplugin.gfe.request;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.DatabaseID;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
@ -31,6 +32,8 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 23, 2010 dgilling Initial creation
* Mar 07, 2013 1759 dgilling Refactored to use more sensible
* fields.
*
* </pre>
*
@ -42,20 +45,35 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
public class PurgeGfeGridsRequest extends AbstractGfeRequest {
@DynamicSerializeElement
private String argString;
private DatabaseID databaseID;
/**
* @return the argString
*/
public String getArgString() {
return argString;
public PurgeGfeGridsRequest() {
// no-op, for serialization
}
/**
* @param argString
* the argString to set
*/
public void setArgString(String argString) {
this.argString = argString;
public PurgeGfeGridsRequest(DatabaseID dbId) {
this.databaseID = dbId;
this.siteID = databaseID.getSiteId();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("PurgeGfeGridsRequest [databaseID=");
builder.append(databaseID);
builder.append(", workstationID=");
builder.append(workstationID);
builder.append(", siteID=");
builder.append(siteID);
builder.append("]");
return builder.toString();
}
public void setDatabaseID(DatabaseID databaseID) {
this.databaseID = databaseID;
}
public DatabaseID getDatabaseID() {
return databaseID;
}
}

View file

@ -18,14 +18,6 @@
# further licensing information.
##
from dynamicserialize.dstypes.com.raytheon.uf.common.dataplugin.gfe.request import PurgeGfeGridsRequest
from dynamicserialize.dstypes.com.raytheon.uf.common.message import WsId
from dynamicserialize import DynamicSerializationManager
import sys
import os
from ufpy import ThriftClient
from ufpy.UsageOptionParser import UsageOptionParser
#
# Provides a command-line utility to purge selected GFE grids.
#
@ -35,40 +27,63 @@ from ufpy.UsageOptionParser import UsageOptionParser
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 09/23/10 dgilling Initial Creation.
#
# 03/07/13 1759 dgilling Modified to support refactored
# PurgeGfeGridsRequest.
#
#
#
import logging
import os
import sys
from dynamicserialize.dstypes.com.raytheon.uf.common.dataplugin.gfe.request import PurgeGfeGridsRequest
from dynamicserialize.dstypes.com.raytheon.uf.common.message import WsId
from ufpy import ThriftClient
from ufpy import UsageArgumentParser
from ufpy.UsageArgumentParser import StoreDatabaseIDAction as StoreDatabaseIDAction
logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
level=logging.DEBUG)
log = logging.getLogger('purgeAllGrids')
def main():
(options, args) = validateArgs()
options = validateArgs()
log.debug("Command-line args: " + repr(options))
try:
purgeRequest = createRequest()
purgeRequest = createRequest(options.databaseID)
log.debug("Sending request: " + str(purgeRequest))
thriftClient = ThriftClient.ThriftClient(options.host, options.port, "/services")
serverResponse = thriftClient.sendRequest(purgeRequest)
except Exception, e:
print >> sys.stderr, "Unhandled exception thrown during grid purge: \n", str(e)
except Exception as e:
log.error("Unhandled exception thrown during grid purge: \n" + str(e))
sys.exit(1)
if (not serverResponse.isOkay()):
print >> sys.stderr, "Errors occurred during grid purge: ", serverResponse.message()
if not serverResponse:
log.error("Errors occurred during grid purge: " + serverResponse.message())
sys.exit(1)
def validateArgs():
usage = "%prog -h hostname -p port -d databaseID"
parser = UsageOptionParser(usage=usage, conflict_handler="resolve")
parser.add_option("-h", action="store", type="string", dest="host",
help="ifpServer host name",
metavar="hostname")
parser.add_option("-p", action="store", type="int", dest="port",
help="port number of the ifpServer",
metavar="port")
parser.add_option("-d", action="store", type="string", dest="databaseID",
help="database identifier",
metavar="databaseID")
usage = "%(prog)s -h hostname -p port -d databaseID"
parser = UsageArgumentParser.UsageArgumentParser(prog='purgeAllGrids',
usage=usage, conflict_handler="resolve")
parser.add_argument("-h", action="store", dest="host",
help="ifpServer host name",
metavar="hostname")
parser.add_argument("-p", action="store", type=int, dest="port",
help="port number of the ifpServer",
metavar="port")
parser.add_argument("-d", action=StoreDatabaseIDAction, dest="databaseID",
required=True, help="database identifier",
metavar="databaseID")
(options, args) = parser.parse_args()
options = parser.parse_args()
if options.host == None:
if "CDSHOST" in os.environ:
@ -81,21 +96,16 @@ def validateArgs():
options.port = int(os.environ["CDSPORT"])
else:
parser.error("No server port defined.")
if (options.databaseID is None):
parser.error("-d: At least one DatabaseID must be provided.")
return (options, args)
return options
def createRequest():
def createRequest(dbId):
obj = PurgeGfeGridsRequest()
obj.setDatabaseID(dbId)
obj.setWorkstationID(WsId(progName="purgeAllGrids"))
obj.setSiteID(dbId.getSiteId())
wsId = WsId(progName="purgeAllGrids")
obj.setWorkstationID(wsId)
obj.setSiteID("")
obj.setArgString(" ".join(sys.argv[1:]))
return obj
if __name__ == '__main__':

View file

@ -20,28 +20,23 @@
# File auto-generated against equivalent DynamicSerialize Java class
class PurgeGfeGridsRequest(object):
from dynamicserialize.dstypes.com.raytheon.uf.common.dataplugin.gfe.request import AbstractGfeRequest
class PurgeGfeGridsRequest(AbstractGfeRequest):
def __init__(self):
self.argString = None
self.workstationID = None
self.siteID = None
super(PurgeGfeGridsRequest, self).__init__()
self.databaseID = None
def __str__(self):
retVal = "PurgeGfeGridsRequest["
retVal += "wokstationID: " + str(self.workstationID) + ", "
retVal += "siteID: " + str(self.siteID) + ", "
retVal += "databaseID: " + str(self.databaseID) + "]"
return retVal
def getArgString(self):
return self.argString
def setArgString(self, argString):
self.argString = argString
def getWorkstationID(self):
return self.workstationID
def setWorkstationID(self, workstationID):
self.workstationID = workstationID
def getSiteID(self):
return self.siteID
def setSiteID(self, siteID):
self.siteID = siteID
def getDatabaseID(self):
return self.databaseID
def setDatabaseID(self, databaseID):
self.databaseID = databaseID

View file

@ -59,4 +59,7 @@ class ServerResponse(object):
return compMessage
def __str__(self):
return self.message()
return self.message()
def __nonzero__(self):
return self.isOkay()

View file

@ -21,6 +21,9 @@
import argparse
import sys
from dynamicserialize.dstypes.com.raytheon.uf.common.dataplugin.gfe.db.objects import DatabaseID
class UsageArgumentParser(argparse.ArgumentParser):
"""
A subclass of ArgumentParser that overrides error() to print the
@ -30,3 +33,14 @@ class UsageArgumentParser(argparse.ArgumentParser):
sys.stderr.write('%s: error: %s\n' % (self.prog, message))
self.print_help()
sys.exit(2)
## Custom actions for ArgumentParser objects ##
class StoreDatabaseIDAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
did = DatabaseID(values)
if did.isValid():
setattr(namespace, self.dest, did)
else:
parser.error("DatabaseID [" + values + "] not a valid identifier")