Former-commit-id:133dc97f67
[formerlya02aeb236c
] [formerly9f19e3f712
] [formerly133dc97f67
[formerlya02aeb236c
] [formerly9f19e3f712
] [formerly06a8b51d6d
[formerly9f19e3f712
[formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]]]] Former-commit-id:06a8b51d6d
Former-commit-id:9bb8decbcf
[formerly8e80217e59
] [formerly377dcd10b9
[formerly3360eb6c5f
]] Former-commit-id:377dcd10b9
Former-commit-id:e2ecdcfe33
216 lines
8.6 KiB
Python
Executable file
216 lines
8.6 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import time, string, traceback
|
|
from brokertest import *
|
|
from qpid.messaging import *
|
|
|
|
|
|
try:
|
|
import java.lang.System
|
|
_cp = java.lang.System.getProperty("java.class.path");
|
|
except ImportError:
|
|
_cp = checkenv("QP_CP")
|
|
|
|
# The base test case has support for launching the genric
|
|
# receiver and sender through the TestLauncher with all the options.
|
|
#
|
|
class JavaClientTest(BrokerTest):
|
|
"""Base Case for Java Test cases"""
|
|
|
|
client_class = "org.apache.qpid.testkit.TestLauncher"
|
|
|
|
# currently there is no transparent reconnection.
|
|
# temp hack: just creating the queue here and closing it.
|
|
def start_error_watcher(self,broker=None):
|
|
ssn = broker.connect().session()
|
|
err_watcher = ssn.receiver("control; {create:always}", capacity=1)
|
|
ssn.close()
|
|
|
|
def client(self,**options):
|
|
cmd = ["java","-cp",_cp]
|
|
cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
|
|
cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
|
|
cmd += ["-Dport=" + str(options.get("port",5672))]
|
|
cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
|
|
cmd += ["-Dssn_count=" + str(options.get("ssn_count",1))]
|
|
cmd += ["-Dqueue_name=" + options.get("queue_name","queue")]
|
|
cmd += ["-Dexchange_name=" + options.get("exchange_name","amq.direct")]
|
|
cmd += ["-Drouting_key=" + options.get("routing_key","routing_key")]
|
|
cmd += ["-Dunique_dests=" + str(options.get("unique_dests",True))]
|
|
cmd += ["-Ddurable=" + str(options.get("durable",False))]
|
|
cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
|
|
cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
|
|
cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
|
|
cmd += ["-Dsender=" + str(options.get("sender",False))]
|
|
cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
|
|
cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
|
|
cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))]
|
|
cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
|
|
cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
|
|
cmd += ["-Dreliability=" + options.get("reliability", "exactly_once")]
|
|
cmd += ["-Dlog.level=" + options.get("log.level", "warn")]
|
|
cmd += [self.client_class]
|
|
|
|
print str(options.get("port",5672))
|
|
return cmd
|
|
|
|
# currently there is no transparent reconnection.
|
|
# temp hack: just creating a receiver and closing session soon after.
|
|
def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
|
|
ssn = broker.connect().session()
|
|
err_watcher = ssn.receiver("control; {create:always}", capacity=1)
|
|
i = run_time/error_ck_freq
|
|
for j in range(i):
|
|
try:
|
|
m = err_watcher.fetch(timeout=error_ck_freq)
|
|
print "Java process notified of an error"
|
|
print self.check_for_error(m)
|
|
except messaging.Empty, e:
|
|
pass # do nothing
|
|
ssn.close()
|
|
|
|
def check_for_error(self,msg):
|
|
raise Exception("Error:%s \nTime:%s\nTrace:%s\n" %
|
|
(msg.properties.get("desc"),
|
|
msg.properties.get("time"),
|
|
msg.properties.get("exception-trace")
|
|
))
|
|
|
|
def terminate_and_capture_logs(self,popen, process_name):
|
|
if popen.is_running():
|
|
popen.terminate()
|
|
log = os.path.join(self.dir, process_name+".out")
|
|
f = open(log, 'w')
|
|
f.write(popen.stdout.read())
|
|
f.close()
|
|
|
|
log = os.path.join(self.dir, process_name+".err")
|
|
f = open(log, 'w')
|
|
f.write(popen.stderr.read())
|
|
f.close()
|
|
|
|
def verify(self, receiver,sender):
|
|
sender_running = receiver.is_running()
|
|
receiver_running = sender.is_running()
|
|
|
|
self.terminate_and_capture_logs(receiver,"receiver")
|
|
self.terminate_and_capture_logs(sender,"sender")
|
|
|
|
self.assertTrue(receiver_running,"Receiver has exited prematually")
|
|
self.assertTrue(sender_running,"Sender has exited prematually")
|
|
|
|
|
|
class ConcurrencyTest(JavaClientTest):
|
|
"""A concurrency test suite for the JMS client"""
|
|
|
|
def test_multiplexing_con(self):
|
|
"""Tests multiple sessions on a single connection"""
|
|
|
|
cluster = Cluster(self, 2)
|
|
p = cluster[0].port()
|
|
|
|
self.start_error_watcher(broker=cluster[0])
|
|
|
|
receiver = self.popen(self.client(receiver=True,
|
|
ssn_count=25,
|
|
port=p,
|
|
test_name=self.id()),
|
|
expect=EXPECT_EXIT_FAIL)
|
|
|
|
sender = self.popen(self.client(sender=True,
|
|
ssn_count=25,
|
|
port=p,
|
|
test_name=self.id()),
|
|
expect=EXPECT_EXIT_FAIL)
|
|
|
|
self.monitor_clients(broker=cluster[0],run_time=60)
|
|
self.verify(receiver,sender)
|
|
|
|
|
|
def test_multiplexing_con_tx(self):
|
|
"""Tests multiple transacted sessions on a single connection"""
|
|
|
|
cluster = Cluster(self,2)
|
|
ssn = cluster[0].connect().session()
|
|
p = cluster[0].port()
|
|
|
|
self.start_error_watcher(broker=cluster[0])
|
|
|
|
receiver = self.popen(self.client(receiver=True,
|
|
ssn_count=25,
|
|
port=p,
|
|
transacted=True,
|
|
test_name=self.id()),
|
|
expect=EXPECT_EXIT_FAIL)
|
|
|
|
sender = self.popen(self.client(sender=True,
|
|
ssn_count=25,
|
|
port=p,
|
|
transacted=True,
|
|
test_name=self.id()),
|
|
expect=EXPECT_EXIT_FAIL)
|
|
|
|
self.monitor_clients(broker=cluster[0],run_time=60)
|
|
ssn.close();
|
|
self.verify(receiver,sender)
|
|
|
|
class SoakTest(JavaClientTest):
|
|
"""A soak test suite for the JMS client"""
|
|
|
|
def test_failover(self):
|
|
cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL)
|
|
p = cluster[0].port()
|
|
self.start_error_watcher(broker=cluster[0])
|
|
receiver = self.popen(self.client(receiver=True,
|
|
ssn_count=1,
|
|
port=p,
|
|
reliability="at_least_once",
|
|
test_name=self.id()),
|
|
expect=EXPECT_EXIT_FAIL)
|
|
|
|
sender = self.popen(self.client(sender=True,
|
|
ssn_count=1,
|
|
port=p,
|
|
reliability="at_least_once",
|
|
test_name=self.id()),
|
|
expect=EXPECT_EXIT_FAIL)
|
|
|
|
# grace period for java clients to get the failover properly setup.
|
|
time.sleep(30)
|
|
error_msg=None
|
|
# Kill original brokers, start new ones.
|
|
try:
|
|
for i in range(4):
|
|
cluster[i].kill()
|
|
b=cluster.start()
|
|
self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
|
|
except ConnectError, e1:
|
|
error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
|
|
|
|
except SessionError, e2:
|
|
error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
|
|
|
|
# verify also captures out/err streams
|
|
self.verify(receiver,sender)
|
|
if error_msg:
|
|
raise Exception(error_msg)
|
|
|