Former-commit-id:133dc97f67
[formerlya02aeb236c
] [formerly9f19e3f712
] [formerly06a8b51d6d
[formerly9f19e3f712
[formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]]] Former-commit-id:06a8b51d6d
Former-commit-id:377dcd10b9
[formerly3360eb6c5f
] Former-commit-id:8e80217e59
775 lines
28 KiB
Python
775 lines
28 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
from qpid.client import Client, Closed
|
|
from qpid.queue import Empty
|
|
from qpid.datatypes import Message, RangedSet
|
|
from qpid.session import SessionException
|
|
from qpid.testlib import TestBase010
|
|
from qpid.compat import set
|
|
from struct import pack, unpack
|
|
from time import sleep
|
|
|
|
class DtxTests(TestBase010):
|
|
"""
|
|
Tests for the amqp dtx related classes.
|
|
|
|
Tests of the form test_simple_xxx test the basic transactional
|
|
behaviour. The approach here is to 'swap' a message from one queue
|
|
to another by consuming and re-publishing in the same
|
|
transaction. That transaction is then completed in different ways
|
|
and the appropriate result verified.
|
|
|
|
The other tests enforce more specific rules and behaviour on a
|
|
per-method or per-field basis.
|
|
"""
|
|
|
|
XA_RBROLLBACK = 1
|
|
XA_RBTIMEOUT = 2
|
|
XA_OK = 0
|
|
tx_counter = 0
|
|
|
|
def reset_channel(self):
|
|
self.session.close()
|
|
self.session = self.conn.session("dtx-session", 1)
|
|
|
|
def test_simple_commit(self):
|
|
"""
|
|
Test basic one-phase commit behaviour.
|
|
"""
|
|
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
|
|
session = self.session
|
|
tx = self.xid("my-xid")
|
|
self.txswap(tx, "commit")
|
|
|
|
#neither queue should have any messages accessible
|
|
self.assertMessageCount(0, "queue-a")
|
|
self.assertMessageCount(0, "queue-b")
|
|
|
|
#commit
|
|
self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status)
|
|
|
|
#should close and reopen session to ensure no unacked messages are held
|
|
self.reset_channel()
|
|
|
|
#check result
|
|
self.assertMessageCount(0, "queue-a")
|
|
self.assertMessageCount(1, "queue-b")
|
|
self.assertMessageId("commit", "queue-b")
|
|
|
|
def test_simple_prepare_commit(self):
|
|
"""
|
|
Test basic two-phase commit behaviour.
|
|
"""
|
|
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
|
|
session = self.session
|
|
tx = self.xid("my-xid")
|
|
self.txswap(tx, "prepare-commit")
|
|
|
|
#prepare
|
|
self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
|
|
|
|
#neither queue should have any messages accessible
|
|
self.assertMessageCount(0, "queue-a")
|
|
self.assertMessageCount(0, "queue-b")
|
|
|
|
#commit
|
|
self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status)
|
|
|
|
self.reset_channel()
|
|
|
|
#check result
|
|
self.assertMessageCount(0, "queue-a")
|
|
self.assertMessageCount(1, "queue-b")
|
|
self.assertMessageId("prepare-commit", "queue-b")
|
|
|
|
|
|
def test_simple_rollback(self):
|
|
"""
|
|
Test basic rollback behaviour.
|
|
"""
|
|
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
|
|
session = self.session
|
|
tx = self.xid("my-xid")
|
|
self.txswap(tx, "rollback")
|
|
|
|
#neither queue should have any messages accessible
|
|
self.assertMessageCount(0, "queue-a")
|
|
self.assertMessageCount(0, "queue-b")
|
|
|
|
#rollback
|
|
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
|
|
|
|
self.reset_channel()
|
|
|
|
#check result
|
|
self.assertMessageCount(1, "queue-a")
|
|
self.assertMessageCount(0, "queue-b")
|
|
self.assertMessageId("rollback", "queue-a")
|
|
|
|
def test_simple_prepare_rollback(self):
|
|
"""
|
|
Test basic rollback behaviour after the transaction has been prepared.
|
|
"""
|
|
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
|
|
session = self.session
|
|
tx = self.xid("my-xid")
|
|
self.txswap(tx, "prepare-rollback")
|
|
|
|
#prepare
|
|
self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
|
|
|
|
#neither queue should have any messages accessible
|
|
self.assertMessageCount(0, "queue-a")
|
|
self.assertMessageCount(0, "queue-b")
|
|
|
|
#rollback
|
|
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
|
|
|
|
self.reset_channel()
|
|
|
|
#check result
|
|
self.assertMessageCount(1, "queue-a")
|
|
self.assertMessageCount(0, "queue-b")
|
|
self.assertMessageId("prepare-rollback", "queue-a")
|
|
|
|
def test_select_required(self):
|
|
"""
|
|
check that an error is flagged if select is not issued before
|
|
start or end
|
|
"""
|
|
session = self.session
|
|
tx = self.xid("dummy")
|
|
try:
|
|
session.dtx_start(xid=tx)
|
|
|
|
#if we get here we have failed, but need to do some cleanup:
|
|
session.dtx_end(xid=tx)
|
|
session.dtx_rollback(xid=tx)
|
|
self.fail("Session not selected for use with dtx, expected exception!")
|
|
except SessionException, e:
|
|
self.assertEquals(503, e.args[0].error_code)
|
|
|
|
def test_start_already_known(self):
|
|
"""
|
|
Verify that an attempt to start an association with a
|
|
transaction that is already known is not allowed (unless the
|
|
join flag is set).
|
|
"""
|
|
#create two sessions on different connection & select them for use with dtx:
|
|
session1 = self.session
|
|
session1.dtx_select()
|
|
|
|
other = self.connect()
|
|
session2 = other.session("other", 0)
|
|
session2.dtx_select()
|
|
|
|
#create a xid
|
|
tx = self.xid("dummy")
|
|
#start work on one session under that xid:
|
|
session1.dtx_start(xid=tx)
|
|
#then start on the other without the join set
|
|
failed = False
|
|
try:
|
|
session2.dtx_start(xid=tx)
|
|
except SessionException, e:
|
|
failed = True
|
|
error = e
|
|
|
|
#cleanup:
|
|
if not failed:
|
|
session2.dtx_end(xid=tx)
|
|
other.close()
|
|
session1.dtx_end(xid=tx)
|
|
session1.dtx_rollback(xid=tx)
|
|
|
|
#verification:
|
|
if failed: self.assertEquals(530, error.args[0].error_code)
|
|
else: self.fail("Xid already known, expected exception!")
|
|
|
|
def test_forget_xid_on_completion(self):
|
|
"""
|
|
Verify that a xid is 'forgotten' - and can therefore be used
|
|
again - once it is completed.
|
|
"""
|
|
#do some transactional work & complete the transaction
|
|
self.test_simple_commit()
|
|
# session has been reset, so reselect for use with dtx
|
|
self.session.dtx_select()
|
|
|
|
#start association for the same xid as the previously completed txn
|
|
tx = self.xid("my-xid")
|
|
self.session.dtx_start(xid=tx)
|
|
self.session.dtx_end(xid=tx)
|
|
self.session.dtx_rollback(xid=tx)
|
|
|
|
def test_start_join_and_resume(self):
|
|
"""
|
|
Ensure the correct error is signalled when both the join and
|
|
resume flags are set on starting an association between a
|
|
session and a transcation.
|
|
"""
|
|
session = self.session
|
|
session.dtx_select()
|
|
tx = self.xid("dummy")
|
|
try:
|
|
session.dtx_start(xid=tx, join=True, resume=True)
|
|
#failed, but need some cleanup:
|
|
session.dtx_end(xid=tx)
|
|
session.dtx_rollback(xid=tx)
|
|
self.fail("Join and resume both set, expected exception!")
|
|
except SessionException, e:
|
|
self.assertEquals(503, e.args[0].error_code)
|
|
|
|
def test_start_join(self):
|
|
"""
|
|
Verify 'join' behaviour, where a session is associated with a
|
|
transaction that is already associated with another session.
|
|
"""
|
|
guard = self.keepQueuesAlive(["one", "two"])
|
|
#create two sessions & select them for use with dtx:
|
|
session1 = self.session
|
|
session1.dtx_select()
|
|
|
|
session2 = self.conn.session("second", 2)
|
|
session2.dtx_select()
|
|
|
|
#setup
|
|
session1.queue_declare(queue="one", auto_delete=True)
|
|
session1.queue_declare(queue="two", auto_delete=True)
|
|
session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage"))
|
|
session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage"))
|
|
|
|
#create a xid
|
|
tx = self.xid("dummy")
|
|
#start work on one session under that xid:
|
|
session1.dtx_start(xid=tx)
|
|
#then start on the other with the join flag set
|
|
session2.dtx_start(xid=tx, join=True)
|
|
|
|
#do work through each session
|
|
self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two'
|
|
self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one'
|
|
|
|
#mark end on both sessions
|
|
session1.dtx_end(xid=tx)
|
|
session2.dtx_end(xid=tx)
|
|
|
|
#commit and check
|
|
session1.dtx_commit(xid=tx, one_phase=True)
|
|
self.assertMessageCount(1, "one")
|
|
self.assertMessageCount(1, "two")
|
|
self.assertMessageId("a", "two")
|
|
self.assertMessageId("b", "one")
|
|
|
|
|
|
def test_suspend_resume(self):
|
|
"""
|
|
Test suspension and resumption of an association
|
|
"""
|
|
session = self.session
|
|
session.dtx_select()
|
|
|
|
#setup
|
|
session.queue_declare(queue="one", exclusive=True, auto_delete=True)
|
|
session.queue_declare(queue="two", exclusive=True, auto_delete=True)
|
|
session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
|
|
session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
|
|
|
|
tx = self.xid("dummy")
|
|
|
|
session.dtx_start(xid=tx)
|
|
self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
|
|
session.dtx_end(xid=tx, suspend=True)
|
|
|
|
session.dtx_start(xid=tx, resume=True)
|
|
self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
|
|
session.dtx_end(xid=tx)
|
|
|
|
#commit and check
|
|
session.dtx_commit(xid=tx, one_phase=True)
|
|
self.assertMessageCount(1, "one")
|
|
self.assertMessageCount(1, "two")
|
|
self.assertMessageId("a", "two")
|
|
self.assertMessageId("b", "one")
|
|
|
|
def test_suspend_start_end_resume(self):
|
|
"""
|
|
Test suspension and resumption of an association with work
|
|
done on another transaction when the first transaction is
|
|
suspended
|
|
"""
|
|
session = self.session
|
|
session.dtx_select()
|
|
|
|
#setup
|
|
session.queue_declare(queue="one", exclusive=True, auto_delete=True)
|
|
session.queue_declare(queue="two", exclusive=True, auto_delete=True)
|
|
session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
|
|
session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
|
|
|
|
tx = self.xid("dummy")
|
|
|
|
session.dtx_start(xid=tx)
|
|
self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
|
|
session.dtx_end(xid=tx, suspend=True)
|
|
|
|
session.dtx_start(xid=tx, resume=True)
|
|
self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
|
|
session.dtx_end(xid=tx)
|
|
|
|
#commit and check
|
|
session.dtx_commit(xid=tx, one_phase=True)
|
|
self.assertMessageCount(1, "one")
|
|
self.assertMessageCount(1, "two")
|
|
self.assertMessageId("a", "two")
|
|
self.assertMessageId("b", "one")
|
|
|
|
def test_end_suspend_and_fail(self):
|
|
"""
|
|
Verify that the correct error is signalled if the suspend and
|
|
fail flag are both set when disassociating a transaction from
|
|
the session
|
|
"""
|
|
session = self.session
|
|
session.dtx_select()
|
|
tx = self.xid("suspend_and_fail")
|
|
session.dtx_start(xid=tx)
|
|
try:
|
|
session.dtx_end(xid=tx, suspend=True, fail=True)
|
|
self.fail("Suspend and fail both set, expected exception!")
|
|
except SessionException, e:
|
|
self.assertEquals(503, e.args[0].error_code)
|
|
|
|
#cleanup
|
|
other = self.connect()
|
|
session = other.session("cleanup", 1)
|
|
session.dtx_rollback(xid=tx)
|
|
session.close()
|
|
other.close()
|
|
|
|
|
|
def test_end_unknown_xid(self):
|
|
"""
|
|
Verifies that the correct exception is thrown when an attempt
|
|
is made to end the association for a xid not previously
|
|
associated with the session
|
|
"""
|
|
session = self.session
|
|
session.dtx_select()
|
|
tx = self.xid("unknown-xid")
|
|
try:
|
|
session.dtx_end(xid=tx)
|
|
self.fail("Attempted to end association with unknown xid, expected exception!")
|
|
except SessionException, e:
|
|
self.assertEquals(409, e.args[0].error_code)
|
|
|
|
def test_end(self):
|
|
"""
|
|
Verify that the association is terminated by end and subsequent
|
|
operations are non-transactional
|
|
"""
|
|
guard = self.keepQueuesAlive(["tx-queue"])
|
|
session = self.conn.session("alternate", 1)
|
|
session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
|
|
|
|
#publish a message under a transaction
|
|
session.dtx_select()
|
|
tx = self.xid("dummy")
|
|
session.dtx_start(xid=tx)
|
|
session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage"))
|
|
session.dtx_end(xid=tx)
|
|
|
|
#now that association with txn is ended, publish another message
|
|
session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage"))
|
|
|
|
#check the second message is available, but not the first
|
|
self.assertMessageCount(1, "tx-queue")
|
|
self.subscribe(session, queue="tx-queue", destination="results")
|
|
msg = session.incoming("results").get(timeout=1)
|
|
self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id'))
|
|
session.message_cancel(destination="results")
|
|
#ack the message then close the session
|
|
session.message_accept(RangedSet(msg.id))
|
|
session.close()
|
|
|
|
session = self.session
|
|
#commit the transaction and check that the first message (and
|
|
#only the first message) is then delivered
|
|
session.dtx_commit(xid=tx, one_phase=True)
|
|
self.assertMessageCount(1, "tx-queue")
|
|
self.assertMessageId("one", "tx-queue")
|
|
|
|
def test_invalid_commit_one_phase_true(self):
|
|
"""
|
|
Test that a commit with one_phase = True is rejected if the
|
|
transaction in question has already been prepared.
|
|
"""
|
|
other = self.connect()
|
|
tester = other.session("tester", 1)
|
|
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
|
|
tester.dtx_select()
|
|
tx = self.xid("dummy")
|
|
tester.dtx_start(xid=tx)
|
|
tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
|
|
tester.dtx_end(xid=tx)
|
|
tester.dtx_prepare(xid=tx)
|
|
failed = False
|
|
try:
|
|
tester.dtx_commit(xid=tx, one_phase=True)
|
|
except SessionException, e:
|
|
failed = True
|
|
error = e
|
|
|
|
if failed:
|
|
self.session.dtx_rollback(xid=tx)
|
|
self.assertEquals(409, error.args[0].error_code)
|
|
else:
|
|
tester.close()
|
|
other.close()
|
|
self.fail("Invalid use of one_phase=True, expected exception!")
|
|
|
|
def test_invalid_commit_one_phase_false(self):
|
|
"""
|
|
Test that a commit with one_phase = False is rejected if the
|
|
transaction in question has not yet been prepared.
|
|
"""
|
|
other = self.connect()
|
|
tester = other.session("tester", 1)
|
|
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
|
|
tester.dtx_select()
|
|
tx = self.xid("dummy")
|
|
tester.dtx_start(xid=tx)
|
|
tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
|
|
tester.dtx_end(xid=tx)
|
|
failed = False
|
|
try:
|
|
tester.dtx_commit(xid=tx, one_phase=False)
|
|
except SessionException, e:
|
|
failed = True
|
|
error = e
|
|
|
|
if failed:
|
|
self.session.dtx_rollback(xid=tx)
|
|
self.assertEquals(409, error.args[0].error_code)
|
|
else:
|
|
tester.close()
|
|
other.close()
|
|
self.fail("Invalid use of one_phase=False, expected exception!")
|
|
|
|
def test_invalid_commit_not_ended(self):
|
|
"""
|
|
Test that a commit fails if the xid is still associated with a session.
|
|
"""
|
|
other = self.connect()
|
|
tester = other.session("tester", 1)
|
|
self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
|
|
self.session.dtx_select()
|
|
tx = self.xid("dummy")
|
|
self.session.dtx_start(xid=tx)
|
|
self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
|
|
|
|
failed = False
|
|
try:
|
|
tester.dtx_commit(xid=tx, one_phase=False)
|
|
except SessionException, e:
|
|
failed = True
|
|
error = e
|
|
|
|
if failed:
|
|
self.session.dtx_end(xid=tx)
|
|
self.session.dtx_rollback(xid=tx)
|
|
self.assertEquals(409, error.args[0].error_code)
|
|
else:
|
|
tester.close()
|
|
other.close()
|
|
self.fail("Commit should fail as xid is still associated!")
|
|
|
|
def test_invalid_rollback_not_ended(self):
|
|
"""
|
|
Test that a rollback fails if the xid is still associated with a session.
|
|
"""
|
|
other = self.connect()
|
|
tester = other.session("tester", 1)
|
|
self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
|
|
self.session.dtx_select()
|
|
tx = self.xid("dummy")
|
|
self.session.dtx_start(xid=tx)
|
|
self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
|
|
|
|
failed = False
|
|
try:
|
|
tester.dtx_rollback(xid=tx)
|
|
except SessionException, e:
|
|
failed = True
|
|
error = e
|
|
|
|
if failed:
|
|
self.session.dtx_end(xid=tx)
|
|
self.session.dtx_rollback(xid=tx)
|
|
self.assertEquals(409, error.args[0].error_code)
|
|
else:
|
|
tester.close()
|
|
other.close()
|
|
self.fail("Rollback should fail as xid is still associated!")
|
|
|
|
|
|
def test_invalid_prepare_not_ended(self):
|
|
"""
|
|
Test that a prepare fails if the xid is still associated with a session.
|
|
"""
|
|
other = self.connect()
|
|
tester = other.session("tester", 1)
|
|
self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
|
|
self.session.dtx_select()
|
|
tx = self.xid("dummy")
|
|
self.session.dtx_start(xid=tx)
|
|
self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
|
|
|
|
failed = False
|
|
try:
|
|
tester.dtx_prepare(xid=tx)
|
|
except SessionException, e:
|
|
failed = True
|
|
error = e
|
|
|
|
if failed:
|
|
self.session.dtx_end(xid=tx)
|
|
self.session.dtx_rollback(xid=tx)
|
|
self.assertEquals(409, error.args[0].error_code)
|
|
else:
|
|
tester.close()
|
|
other.close()
|
|
self.fail("Rollback should fail as xid is still associated!")
|
|
|
|
def test_implicit_end(self):
|
|
"""
|
|
Test that an association is implicitly ended when the session
|
|
is closed (whether by exception or explicit client request)
|
|
and the transaction in question is marked as rollback only.
|
|
"""
|
|
session1 = self.session
|
|
session2 = self.conn.session("other", 2)
|
|
|
|
#setup:
|
|
session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
|
|
session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever"))
|
|
tx = self.xid("dummy")
|
|
|
|
session2.dtx_select()
|
|
session2.dtx_start(xid=tx)
|
|
session2.message_subscribe(queue="dummy", destination="dummy")
|
|
session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1)
|
|
session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFFL)
|
|
msg = session2.incoming("dummy").get(timeout=1)
|
|
session2.message_accept(RangedSet(msg.id))
|
|
session2.message_cancel(destination="dummy")
|
|
session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever"))
|
|
session2.close()
|
|
|
|
self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status)
|
|
session1.dtx_rollback(xid=tx)
|
|
|
|
def test_get_timeout(self):
|
|
"""
|
|
Check that get-timeout returns the correct value, (and that a
|
|
transaction with a timeout can complete normally)
|
|
"""
|
|
session = self.session
|
|
tx = self.xid("dummy")
|
|
|
|
session.dtx_select()
|
|
session.dtx_start(xid=tx)
|
|
self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout)
|
|
session.dtx_set_timeout(xid=tx, timeout=60)
|
|
self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
|
|
self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
|
|
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
|
|
|
|
def test_set_timeout(self):
|
|
"""
|
|
Test the timeout of a transaction results in the expected
|
|
behaviour
|
|
"""
|
|
|
|
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
|
|
#open new session to allow self.session to be used in checking the queue
|
|
session = self.conn.session("worker", 1)
|
|
#setup:
|
|
tx = self.xid("dummy")
|
|
session.queue_declare(queue="queue-a", auto_delete=True)
|
|
session.queue_declare(queue="queue-b", auto_delete=True)
|
|
session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage"))
|
|
|
|
session.dtx_select()
|
|
session.dtx_start(xid=tx)
|
|
self.swap(session, "queue-a", "queue-b")
|
|
session.dtx_set_timeout(xid=tx, timeout=2)
|
|
sleep(3)
|
|
#check that the work has been rolled back already
|
|
self.assertMessageCount(1, "queue-a")
|
|
self.assertMessageCount(0, "queue-b")
|
|
self.assertMessageId("timeout", "queue-a")
|
|
#check the correct codes are returned when we try to complete the txn
|
|
self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
|
|
self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
|
|
|
|
|
|
|
|
def test_recover(self):
|
|
"""
|
|
Test basic recover behaviour
|
|
"""
|
|
session = self.session
|
|
|
|
session.dtx_select()
|
|
session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
|
|
|
|
prepared = []
|
|
for i in range(1, 10):
|
|
tx = self.xid("tx%s" % (i))
|
|
session.dtx_start(xid=tx)
|
|
session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i)))
|
|
session.dtx_end(xid=tx)
|
|
if i in [2, 5, 6, 8]:
|
|
session.dtx_prepare(xid=tx)
|
|
prepared.append(tx)
|
|
else:
|
|
session.dtx_rollback(xid=tx)
|
|
|
|
xids = session.dtx_recover().in_doubt
|
|
|
|
#rollback the prepared transactions returned by recover
|
|
for x in xids:
|
|
session.dtx_rollback(xid=x)
|
|
|
|
#validate against the expected list of prepared transactions
|
|
actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these
|
|
expected = set([x.global_id for x in prepared])
|
|
intersection = actual.intersection(expected)
|
|
|
|
if intersection != expected:
|
|
missing = expected.difference(actual)
|
|
extra = actual.difference(expected)
|
|
self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
|
|
|
|
def test_bad_resume(self):
|
|
"""
|
|
Test that a resume on a session not selected for use with dtx fails
|
|
"""
|
|
session = self.session
|
|
try:
|
|
session.dtx_start(resume=True)
|
|
except SessionException, e:
|
|
self.assertEquals(503, e.args[0].error_code)
|
|
|
|
def test_prepare_unknown(self):
|
|
session = self.session
|
|
try:
|
|
session.dtx_prepare(xid=self.xid("unknown"))
|
|
except SessionException, e:
|
|
self.assertEquals(404, e.args[0].error_code)
|
|
|
|
def test_commit_unknown(self):
|
|
session = self.session
|
|
try:
|
|
session.dtx_commit(xid=self.xid("unknown"))
|
|
except SessionException, e:
|
|
self.assertEquals(404, e.args[0].error_code)
|
|
|
|
def test_rollback_unknown(self):
|
|
session = self.session
|
|
try:
|
|
session.dtx_rollback(xid=self.xid("unknown"))
|
|
except SessionException, e:
|
|
self.assertEquals(404, e.args[0].error_code)
|
|
|
|
def test_get_timeout_unknown(self):
|
|
session = self.session
|
|
try:
|
|
session.dtx_get_timeout(xid=self.xid("unknown"))
|
|
except SessionException, e:
|
|
self.assertEquals(404, e.args[0].error_code)
|
|
|
|
def xid(self, txid):
|
|
DtxTests.tx_counter += 1
|
|
branchqual = "v%s" % DtxTests.tx_counter
|
|
return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
|
|
|
|
def txswap(self, tx, id):
|
|
session = self.session
|
|
#declare two queues:
|
|
session.queue_declare(queue="queue-a", auto_delete=True)
|
|
session.queue_declare(queue="queue-b", auto_delete=True)
|
|
|
|
#put message with specified id on one queue:
|
|
dp=session.delivery_properties(routing_key="queue-a")
|
|
mp=session.message_properties(correlation_id=id)
|
|
session.message_transfer(message=Message(dp, mp, "DtxMessage"))
|
|
|
|
#start the transaction:
|
|
session.dtx_select()
|
|
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
|
|
|
|
#'swap' the message from one queue to the other, under that transaction:
|
|
self.swap(self.session, "queue-a", "queue-b")
|
|
|
|
#mark the end of the transactional work:
|
|
self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
|
|
|
|
def swap(self, session, src, dest):
|
|
#consume from src:
|
|
session.message_subscribe(destination="temp-swap", queue=src)
|
|
session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1)
|
|
session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
|
|
msg = session.incoming("temp-swap").get(timeout=1)
|
|
session.message_cancel(destination="temp-swap")
|
|
session.message_accept(RangedSet(msg.id))
|
|
#todo: also complete at this point?
|
|
|
|
#re-publish to dest:
|
|
dp=session.delivery_properties(routing_key=dest)
|
|
mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id'))
|
|
session.message_transfer(message=Message(dp, mp, msg.body))
|
|
|
|
def assertMessageCount(self, expected, queue):
|
|
self.assertEqual(expected, self.session.queue_query(queue=queue).message_count)
|
|
|
|
def assertMessageId(self, expected, queue):
|
|
self.session.message_subscribe(queue=queue, destination="results")
|
|
self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1)
|
|
self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
|
|
self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id'))
|
|
self.session.message_cancel(destination="results")
|
|
|
|
def getMessageProperty(self, msg, prop):
|
|
for h in msg.headers:
|
|
if hasattr(h, prop): return getattr(h, prop)
|
|
return None
|
|
|
|
def keepQueuesAlive(self, names):
|
|
session = self.conn.session("nasty", 99)
|
|
for n in names:
|
|
session.queue_declare(queue=n, auto_delete=True)
|
|
session.message_subscribe(destination=n, queue=n)
|
|
return session
|
|
|
|
def createMessage(self, session, key, id, body):
|
|
dp=session.delivery_properties(routing_key=key)
|
|
mp=session.message_properties(correlation_id=id)
|
|
session.message_transfer(message=Message(dp, mp, body))
|