Merge "Omaha #2926 switched text subscription to thrift client" into omaha_14.4.1

Former-commit-id: da3e7dbf72 [formerly da3e7dbf72 [formerly 51c60cf39cc950771597ff652c0e683c73812d91]]
Former-commit-id: 88e11a4909
Former-commit-id: 9083d4956b
This commit is contained in:
Nate Jensen 2014-09-08 13:58:00 -05:00 committed by Gerrit Code Review
commit 800ffc4ef9
13 changed files with 255 additions and 159 deletions

View file

@ -0,0 +1,77 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.dataplugin.text.request;
import com.raytheon.uf.common.message.Message;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
import com.raytheon.uf.common.serialization.comm.IServerRequest;
/**
* Object used by thrift clients to make subscription requests
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Sep 5, 2014 2926 bclement Initial creation
*
* </pre>
*
* @author bclement
* @version 1.0
*/
@DynamicSerialize
public class SubscriptionRequest implements IServerRequest {
@DynamicSerializeElement
private Message message;
/**
*
*/
public SubscriptionRequest() {
}
/**
* @param message
*/
public SubscriptionRequest(Message message) {
this.message = message;
}
/**
* @return the message
*/
public Message getMessage() {
return message;
}
/**
* @param message
* the message to set
*/
public void setMessage(Message message) {
this.message = message;
}
}

View file

@ -103,6 +103,13 @@
</bean>
</constructor-arg>
</bean>
<bean factory-bean="handlerRegistry" factory-method="register">
<constructor-arg value="com.raytheon.uf.common.dataplugin.text.request.SubscriptionRequest"/>
<constructor-arg>
<bean class="com.raytheon.uf.edex.plugin.text.subscription.services.SubscriptionRequestHandler" />
</constructor-arg>
</bean>
<bean id="remoteRetrievalManager" class="com.raytheon.uf.edex.plugin.text.handler.RemoteRetrievalManager">
<!-- <property name="remoteRetrievalExternalURI" value="direct-vm:remoteRetrievalExternalRoute" /> -->
@ -127,16 +134,11 @@
<property name="destinationResolver" ref="qpidNoDurableResolver"/>
</bean>
<bean id="subscription" class="com.raytheon.uf.edex.plugin.text.subscription.services.SubscribeManager"/>
<camelContext id="textdbsrv-request-camel" xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="errorHandler">
<proxy id="textdbRemoteRetrievalTarget"
serviceInterface="com.raytheon.uf.edex.plugin.text.handler.RemoteRetrievalManager$IRRDelegate"
serviceUrl="direct-vm:remoteRetrievalExternalRoute" />
<endpoint id="subscriptionHTTP_from"
uri="jetty:http://0.0.0.0:${HTTP_PORT}/services/subscribe?disableStreamCache=true" />
<route id="remoteRetrievalExternal">
<from uri="direct-vm:remoteRetrievalExternalRoute" />
@ -147,11 +149,5 @@
<bean ref="serializationUtil" method="transformFromThrift" />
</route>
<!-- SubscriptionSrv routes -->
<route id="subscriptionHTTP">
<from uri="ref:subscriptionHTTP_from" />
<bean ref="subscription" method="processRequest"/>
<bean ref="serializationUtil" method="marshalToXml" />
</route>
</camelContext>
</beans>

View file

@ -75,6 +75,7 @@ import com.raytheon.uf.edex.plugin.text.subscription.util.TriggerMatcher;
* ------------ ---------- ----------- --------------------------
* 14Nov2008 1709 MW Fegan Initial creation.
* May 22, 2014 2536 bclement moved from autobldsrv to edex.plugin.text
* Sep 05, 2014 2926 bclement added getDirectResults() to avoid marshaling
*
* </pre>
*
@ -83,7 +84,11 @@ import com.raytheon.uf.edex.plugin.text.subscription.util.TriggerMatcher;
*/
public class SubscribeQueryRunner extends ASubscribeRunner {
private List<SubscriptionRecord> results;
private String trigger = null;
/**
* Constructor.
*/
@ -118,10 +123,10 @@ public class SubscribeQueryRunner extends ASubscribeRunner {
} else {
retVal = dao.getSubscriptions();
}
retVal = filterRecords(retVal);
this.results = packageSubscriptions(retVal);
results = filterRecords(retVal);
return false;
}
/**
* Filters the list of records received, limiting to those that match
* the trigger condition.
@ -139,6 +144,7 @@ public class SubscribeQueryRunner extends ASubscribeRunner {
}
return retVal;
}
/**
* Packages the subscriptions for return to the client. Each record
* is bundled within the value of a {@link Property} object with a
@ -165,6 +171,7 @@ public class SubscribeQueryRunner extends ASubscribeRunner {
}
return retVal;
}
/**
* Modifies the list of {@link Property} objects by removing the <em>trigger</em>
* property and adding a <em>active</em> property with a value of <em>TRUE</em>.
@ -185,4 +192,20 @@ public class SubscribeQueryRunner extends ASubscribeRunner {
retVal.add(new Property("active","TRUE"));
return retVal;
}
/*
* (non-Javadoc)
*
* @see
* com.raytheon.uf.edex.plugin.text.subscription.runners.ASubscribeRunner
* #getResults()
*/
@Override
public List<Property> getResults() {
return packageSubscriptions(results);
}
public List<SubscriptionRecord> getDirectResults(){
return results;
}
}

View file

@ -63,6 +63,7 @@ import com.raytheon.uf.edex.plugin.text.subscription.util.SubscribeAction;
* May 22, 2014 2536 bclement moved from autobldsrv to edex.plugin.text
* removed duplicate SubscribeAction enum
* Aug 22, 2014 2926 bclement improved error handling for unknown operation
* Sep 05, 2014 2926 bclement removed Class.forName() call
*
* </pre>
*
@ -134,14 +135,12 @@ public class SubscribeRunner {
"Unable to initialize ISubscriberunner instance; unable to find action for operation ["
+ oper + "]");
}
String className = action.getRunner();
if (className == null) {
Class<? extends ISubscribeRunner> aClass = action.getRunner();
if (aClass == null) {
throw new EdexException("Unable to initialize ISubscribeRunner instance; invalid operation [" + oper + "] specified - unable to continue");
}
Class<?> aClass = null;
try {
aClass = Class.forName(className);
retVal = (ISubscribeRunner)aClass.newInstance();
if (message != null) {
retVal.initialize(message);

View file

@ -26,8 +26,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.xml.bind.JAXBException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,13 +38,10 @@ import com.raytheon.uf.common.dataplugin.text.db.SubscriptionRecord;
import com.raytheon.uf.common.message.Header;
import com.raytheon.uf.common.message.Message;
import com.raytheon.uf.common.message.Property;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.util.ReflectionUtil;
import com.raytheon.uf.common.util.StringUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.plugin.text.subscription.runners.ISubscribeRunner;
import com.raytheon.uf.edex.plugin.text.subscription.runners.SubscribeRunner;
import com.raytheon.uf.edex.plugin.text.subscription.util.Tools;
import com.raytheon.uf.edex.plugin.text.subscription.runners.SubscribeQueryRunner;
/**
* Main class of the EDEX Script Runner.
@ -73,6 +68,7 @@ import com.raytheon.uf.edex.plugin.text.subscription.util.Tools;
* argument to the engine.
* Feb 15, 2013 1638 mschenke Moved DataURINotificationMessage to uf.common.dataplugin
* May 22, 2014 2536 bclement moved from autobldsrv to edex.plugin.text
* Sep 05, 2014 2926 bclement get query results directly, removed decodeResponse()
*
* </pre>
*
@ -112,8 +108,7 @@ public class ScriptRunner {
List<String> triggers = decodeTrigger(event);
for (String trigger : triggers) {
Message query = prepareQueryMessage(trigger);
List<Property> reply = querySubscriptions(query);
List<SubscriptionRecord> subscriptions = decodeResponse(reply);
List<SubscriptionRecord> subscriptions = querySubscriptions(query);
for (SubscriptionRecord record : subscriptions) {
if (record.getTrigger().indexOf(" ") > 0) {
trigger = record.getTrigger();
@ -313,40 +308,6 @@ public class ScriptRunner {
return retVal;
}
/**
* Extracts the subscription records from the message.
*
* @param properties
* list of properties containing the query results
*
* @return list of subscription records containing scripts to run
*/
private List<SubscriptionRecord> decodeResponse(List<Property> properties) {
List<SubscriptionRecord> retVal = new ArrayList<SubscriptionRecord>();
for (Property property : properties) {
if ("subscription".equalsIgnoreCase(property.getName())) {
try {
Object obj = SerializationUtil.unmarshalFromXml(property
.getValue());
if (obj instanceof SubscriptionRecord) {
SubscriptionRecord rec = (SubscriptionRecord) obj;
if (rec.getScript() != null) {
rec.setScript(Tools.hexToAscii(rec.getScript()));
}
retVal.add((SubscriptionRecord) obj);
} else {
logger.warn("Unable to extract SubscriptionRecord from Message");
}
} catch (JAXBException e) {
logger.warn(
"Unable to extract SubscriptionRecord from Message",
e);
}
}
}
return retVal;
}
/**
* Performs a query of the database to determine if there are any
* subscriptions that match the trigger criteria.
@ -354,18 +315,16 @@ public class ScriptRunner {
* @param message
* message containing the desired query information
*
* @return property list containing the results of the query
* @return subscription record list containing the results of the query
*
* @throws EdexException
* if an error occurs
*/
private List<Property> querySubscriptions(Message message)
private List<SubscriptionRecord> querySubscriptions(Message message)
throws EdexException {
ISubscribeRunner runner = SubscribeRunner
.getInstance(SUBSCRIBE_OPERATION);
runner.initialize(message);
SubscribeQueryRunner runner = new SubscribeQueryRunner(message);
runner.execute();
return runner.getResults();
return runner.getDirectResults();
}
/**

View file

@ -22,21 +22,18 @@ package com.raytheon.uf.edex.plugin.text.subscription.services;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.raytheon.uf.common.dataplugin.text.request.SubscriptionRequest;
import com.raytheon.uf.common.message.Header;
import com.raytheon.uf.common.message.Message;
import com.raytheon.uf.common.message.Property;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.serialization.comm.IRequestHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.edex.plugin.text.subscription.runners.ISubscribeRunner;
import com.raytheon.uf.edex.plugin.text.subscription.runners.SubscribeRunner;
import com.raytheon.uf.edex.plugin.text.subscription.util.Tools;
/**
* The main class of the Subscription Manager. Receives and processes
* subscription requests from a client. Returns a response based on the result
* of processing the request.
* Thrift handler for text subscription requests
*
* <pre>
*
@ -44,45 +41,43 @@ import com.raytheon.uf.edex.plugin.text.subscription.util.Tools;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 15Dec2008 1709 MW Fegan Initial Creation. Replaces SubscribeSrv.
* May 22, 2014 2536 bclement moved from autobldsrv to edex.plugin.text
* Sep 5, 2014 2926 bclement Initial creation, replaced SubscribeManager
*
* </pre>
*
* @author mfegan
* @author bclement
* @version 1.0
*/
public class SubscriptionRequestHandler implements
IRequestHandler<SubscriptionRequest> {
private static final IUFStatusHandler logger = UFStatus
.getHandler(SubscriptionRequestHandler.class);
public class SubscribeManager {
private transient Log logger = LogFactory.getLog(getClass());
/**
* Constructor.
*
*/
public SubscribeManager() {
super();
public SubscriptionRequestHandler() {
}
/**
* Processes the subscription request.
*
* @param requestXML XML containing the request
*
* @return the response to the request
/* (non-Javadoc)
* @see com.raytheon.uf.common.serialization.comm.IRequestHandler#handleRequest(com.raytheon.uf.common.serialization.comm.IServerRequest)
*/
public Message processRequest(String requestXML) {
@Override
public Object handleRequest(SubscriptionRequest request) throws Exception {
String msg = "";
logger.debug("Processing request: " + requestXML);
List<Property> results = null;
try {
Message xml = parseMessage(requestXML);
String oper = xml.getHeader().getProperty("operation");
Message requestMsg = request.getMessage();
String oper = requestMsg.getHeader().getProperty("operation");
ISubscribeRunner runner = SubscribeRunner.getInstance(oper);
if (null == runner) {
msg = "Unable to get subscription runner for " + oper;
logger.warn(msg);
results = new ArrayList<Property>();
results.add(new Property("STDERR",msg));
results.add(new Property("STDERR", msg));
} else {
runner.initialize(xml);
runner.initialize(requestMsg);
runner.execute();
results = runner.getResults();
}
@ -90,44 +85,28 @@ public class SubscribeManager {
msg = "Unable to process message. " + e.toString();
logger.error(msg, e);
results = new ArrayList<Property>();
results.add(new Property("STDERR",msg));
results.add(new Property("STDERR", msg));
}
Message xmlMsg = createMessage(results);
return xmlMsg;
}
/**
* Creates a Message object containing the results of the the execution.
*
* @param result the result to convert to a message
* @param result
* the result to convert to a message
*
* @return message object containing the result
*/
private final Message createMessage(List<Property> result) {
Message msg = new Message();
Header h = new Header();
h.setProperties(result.toArray(new Property[] {} ));
h.setProperties(result.toArray(new Property[] {}));
msg.setHeader(h);
return msg;
}
/**
*
* @param message
* @return
* @throws Exception
*/
private final Message parseMessage(String message) throws Exception {
Object m = SerializationUtil.unmarshalFromXml(message);
if (m instanceof Message) {
for (Property property : ((Message) m).getHeader().getProperties()) {
if (!"script".equalsIgnoreCase(property.getName())) {
property.setValue(Tools.hexToAscii(property.getValue()));
}
}
return (Message)m;
}
return null;
}
}

View file

@ -24,6 +24,7 @@ import java.util.Map;
import com.raytheon.uf.common.message.Message;
import com.raytheon.uf.edex.plugin.text.subscription.runners.ASubscribeRunner;
import com.raytheon.uf.edex.plugin.text.subscription.runners.ISubscribeRunner;
import com.raytheon.uf.edex.plugin.text.subscription.runners.SubscribeAddRunner;
import com.raytheon.uf.edex.plugin.text.subscription.runners.SubscribeDeleteRunner;
import com.raytheon.uf.edex.plugin.text.subscription.runners.SubscribeQueryRunner;
@ -45,6 +46,7 @@ import com.raytheon.uf.edex.plugin.text.subscription.runners.SubscribeUpdateRunn
* 14Nov2008 1709 MW Fegan Initial creation.
* May 22, 2014 2536 bclement moved from autobldsrv to edex.plugin.text
* removed hard coded class names
* Sep 05, 2014 2926 bclement switched map from class names to class objects
*
* </pre>
*
@ -67,17 +69,18 @@ public enum SubscribeAction {
put("query",ACTION_QUERY);
}
};
/**
* A mapping of SubscribeAction objects to implementation class names.
* A mapping of SubscribeAction objects to implementation class
*/
private static final Map<SubscribeAction, String> runners = new HashMap<SubscribeAction, String>() {
private static final Map<SubscribeAction, Class<? extends ISubscribeRunner>> runners = new HashMap<SubscribeAction, Class<? extends ISubscribeRunner>>() {
private static final long serialVersionUID = 1L;
{
put(ACTION_ADD, SubscribeAddRunner.class.getName());
put(ACTION_READ, SubscribeReadRunner.class.getName());
put(ACTION_DELETE, SubscribeDeleteRunner.class.getName());
put(ACTION_UPDATE, SubscribeUpdateRunner.class.getName());
put(ACTION_QUERY, SubscribeQueryRunner.class.getName());
put(ACTION_ADD, SubscribeAddRunner.class);
put(ACTION_READ, SubscribeReadRunner.class);
put(ACTION_DELETE, SubscribeDeleteRunner.class);
put(ACTION_UPDATE, SubscribeUpdateRunner.class);
put(ACTION_QUERY, SubscribeQueryRunner.class);
}
};
/**
@ -97,7 +100,7 @@ public enum SubscribeAction {
/**
* Returns the class name of the runner associated with the action.
*/
public final String getRunner() {
public final Class<? extends ISubscribeRunner> getRunner() {
return runners.get(this);
}
}

View file

@ -20,10 +20,9 @@
package com.raytheon.uf.edex.plugin.text.subscription.util;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.xml.bind.annotation.adapters.HexBinaryAdapter;
import com.raytheon.uf.common.message.Property;
/**
@ -38,6 +37,7 @@ import com.raytheon.uf.common.message.Property;
* 14Nov2008 1709 MW Fegan Initial creation.
* 14Sep2010 3944 cjeanbap Trim the newline char from value.
* May 22, 2014 2536 bclement moved from autobldsrv to edex.plugin.text
* Sep 05, 2014 2926 bclement removed hex utilities
*
* </pre>
*
@ -53,6 +53,7 @@ public final class Tools {
private Tools() {
super();
}
/**
*
* @param properties
@ -78,12 +79,4 @@ public final class Tools {
return retVal;
}
public static final String AsciiToHex(String string) {
return new HexBinaryAdapter().marshal(string.getBytes());
}
public static final String hexToAscii(String hexString) {
byte[] b = new HexBinaryAdapter().unmarshal(hexString);
return new String(b);
}
}

View file

@ -23,11 +23,15 @@ import types
import lib.CommandLine as CL
import lib.InputOutput as IO
import lib.CommHandler as CH
import lib.Message as MSG
import lib.Util as util
import conf.SMConfig as config
import collections
from ufpy import ThriftClient
from dynamicserialize.dstypes.com.raytheon.uf.common.message import Message, Header, Property
from dynamicserialize.dstypes.com.raytheon.uf.common.dataplugin.text.request import SubscriptionRequest
##############################################################################
# Class providing a command line interface to the EDEX Subscription Service
# (SubscribeSrv) end-point. The package design, is to allow this class to run
@ -267,16 +271,13 @@ class SubscriptionManager:
# 1: indicates the execution was unsuccessful
def __processRequest(self):
msg = self.__createMessage()
service = config.endpoint.get('subscribe')
connection=str(os.getenv("DEFAULT_HOST", "localhost") + ":" + os.getenv("DEFAULT_PORT", "9581"))
ch = CH.CommHandler(connection, service)
ch.process(msg)
if not ch.isGoodStatus():
util.reportHTTPResponse(ch.formatResponse())
retVal = self.__processResponse(ch.getContents())
host = os.getenv("DEFAULT_HOST", "localhost")
port = os.getenv("DEFAULT_PORT", "9581")
tClient = ThriftClient.ThriftClient(host, port)
req = SubscriptionRequest()
req.setMessage(msg)
resp = tClient.sendRequest(req)
retVal = self.__processResponse(resp)
return retVal
# Processes the request and reports the results. Data is
@ -288,14 +289,12 @@ class SubscriptionManager:
# return:
# 0 if the message contained valid results, 0 otherwise
def __processResponse(self,msg):
psr = MSG.Message(False)
psr.parse(msg)
status = 0
io = IO.InputOutput()
# process the return message
for prop in psr.getProperties():
name = prop['name']
value = prop['value']
for prop in msg.getHeader().getProperties():
name = prop.getName()
value = prop.getValue()
if name == 'STDERR':
parts = value.split(':',2)
if parts[0] == 'ERROR':
@ -312,24 +311,23 @@ class SubscriptionManager:
# return:
# the request message
def __createMessage(self):
msg = MSG.Message(True)
msg.initializeMessage(False)
multimap = collections.defaultdict(list)
for key in self.commands:
if key == 'mode':
pass
elif key == 'substitution':
dict = util.convListToDict(self.commands.get(key))
for sub in dict:
msg.addProperty(name='substitution',value=str(sub)+":"+str(dict.get(sub)))
multimap['substitution'].append(str(sub)+":"+str(dict.get(sub)))
elif key == 'update':
dict = util.convListToDict(self.commands.get(key))
for sub in dict:
msg.addProperty(name='update',value=str(sub)+":"+str(dict.get(sub)))
multimap['update'].append(str(sub)+":"+str(dict.get(sub)))
else:
msg.addProperty(name=key,value=self.commands.get(key))
multimap[key].append(self.commands.get(key))
if self.__hasScript():
msg.addProperty(name='script',value=self.script)
return msg.getXML()
multimap['script'].append(self.script)
return Message(header=Header(multimap=multimap))
# main class method. Performs the subscription update request.
#
@ -361,4 +359,4 @@ class SubscriptionManager:
if __name__ == "__main__":
ue = SubscriptionManager()
status = ue.execute()
exit(status)
exit(status)

View file

@ -26,7 +26,8 @@ __all__ = [
'grid',
'level',
'message',
'radar'
'radar',
'text'
]

View file

@ -21,7 +21,8 @@
# File auto-generated by PythonFileGenerator
__all__ = [
'dbsrv'
'dbsrv',
'request'
]

View file

@ -0,0 +1,39 @@
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
# File auto-generated against equivalent DynamicSerialize Java class
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# Sep 05, 2014 bclement Generated
class SubscriptionRequest(object):
def __init__(self):
self.message = None
def getMessage(self):
return self.message
def setMessage(self, message):
self.message = message

View file

@ -0,0 +1,28 @@
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
# File auto-generated by PythonFileGenerator
__all__ = [
'SubscriptionRequest'
]
from SubscriptionRequest import SubscriptionRequest