601 lines
20 KiB
Python
601 lines
20 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import unittest
|
|
from qpid.codec import Codec
|
|
from qpid.spec import load
|
|
from cStringIO import StringIO
|
|
from qpid.reference import ReferenceId
|
|
|
|
__doc__ = """
|
|
|
|
This is a unit test script for qpid/codec.py
|
|
|
|
It can be run standalone or as part of the existing test framework.
|
|
|
|
To run standalone:
|
|
-------------------
|
|
|
|
Place in the qpid/python/tests/ directory and type...
|
|
|
|
python codec.py
|
|
|
|
A brief output will be printed on screen. The verbose output will be placed inn a file called
|
|
codec_unit_test_output.txt. [TODO: make this filename configurable]
|
|
|
|
To run as part of the existing test framework:
|
|
-----------------------------------------------
|
|
|
|
python run-tests tests.codec
|
|
|
|
Change History:
|
|
-----------------
|
|
Jimmy John 05/19/2007 Initial draft
|
|
Jimmy John 05/22/2007 Implemented comments by Rafael Schloming
|
|
|
|
|
|
"""
|
|
|
|
from qpid_config import amqp_spec_0_8
|
|
SPEC = load(amqp_spec_0_8)
|
|
|
|
# --------------------------------------
|
|
# --------------------------------------
|
|
class BaseDataTypes(unittest.TestCase):
|
|
|
|
|
|
"""
|
|
Base class containing common functions
|
|
"""
|
|
|
|
# ---------------
|
|
def setUp(self):
|
|
"""
|
|
standard setUp for unitetest (refer unittest documentation for details)
|
|
"""
|
|
self.codec = Codec(StringIO(), SPEC)
|
|
|
|
# ------------------
|
|
def tearDown(self):
|
|
"""
|
|
standard tearDown for unitetest (refer unittest documentation for details)
|
|
"""
|
|
self.codec.stream.flush()
|
|
self.codec.stream.close()
|
|
|
|
# ----------------------------------------
|
|
def callFunc(self, functionName, *args):
|
|
"""
|
|
helper function - given a function name and arguments, calls the function with the args and
|
|
returns the contents of the stream
|
|
"""
|
|
getattr(self.codec, functionName)(args[0])
|
|
return self.codec.stream.getvalue()
|
|
|
|
# ----------------------------------------
|
|
def readFunc(self, functionName, *args):
|
|
"""
|
|
helper function - creates a input stream and then calls the function with arguments as have been
|
|
supplied
|
|
"""
|
|
self.codec.stream = StringIO(args[0])
|
|
return getattr(self.codec, functionName)()
|
|
|
|
|
|
# ----------------------------------------
|
|
# ----------------------------------------
|
|
class IntegerTestCase(BaseDataTypes):
|
|
|
|
"""
|
|
Handles octet, short, long, long long
|
|
|
|
"""
|
|
|
|
# -------------------------
|
|
def __init__(self, *args):
|
|
"""
|
|
sets constants for use in tests
|
|
"""
|
|
|
|
BaseDataTypes.__init__(self, *args)
|
|
self.const_integer = 2
|
|
self.const_integer_octet_encoded = '\x02'
|
|
self.const_integer_short_encoded = '\x00\x02'
|
|
self.const_integer_long_encoded = '\x00\x00\x00\x02'
|
|
self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02'
|
|
|
|
# -------------------------- #
|
|
# Unsigned Octect - 8 bits #
|
|
# -------------------------- #
|
|
|
|
# --------------------------
|
|
def test_unsigned_octet(self):
|
|
"""
|
|
ubyte format requires 0<=number<=255
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...')
|
|
|
|
# -------------------------------------------
|
|
def test_octet_out_of_upper_range(self):
|
|
"""
|
|
testing for input above acceptable range
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_octet, 256)
|
|
|
|
# -------------------------------------------
|
|
def test_uoctet_out_of_lower_range(self):
|
|
"""
|
|
testing for input below acceptable range
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_octet, -1)
|
|
|
|
# ---------------------------------
|
|
def test_uoctet_with_fraction(self):
|
|
"""
|
|
the fractional part should be ignored...
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...')
|
|
|
|
# ------------------------------------
|
|
def test_unsigned_octet_decode(self):
|
|
"""
|
|
octet decoding
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...')
|
|
|
|
# ----------------------------------- #
|
|
# Unsigned Short Integers - 16 bits #
|
|
# ----------------------------------- #
|
|
|
|
# -----------------------
|
|
def test_ushort_int(self):
|
|
"""
|
|
testing unsigned short integer
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...')
|
|
|
|
# -------------------------------------------
|
|
def test_ushort_int_out_of_upper_range(self):
|
|
"""
|
|
testing for input above acceptable range
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_short, 65536)
|
|
|
|
# -------------------------------------------
|
|
def test_ushort_int_out_of_lower_range(self):
|
|
"""
|
|
testing for input below acceptable range
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_short, -1)
|
|
|
|
# ---------------------------------
|
|
def test_ushort_int_with_fraction(self):
|
|
"""
|
|
the fractional part should be ignored...
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...')
|
|
|
|
# ------------------------------------
|
|
def test_ushort_int_decode(self):
|
|
"""
|
|
unsigned short decoding
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...')
|
|
|
|
|
|
# ---------------------------------- #
|
|
# Unsigned Long Integers - 32 bits #
|
|
# ---------------------------------- #
|
|
|
|
# -----------------------
|
|
def test_ulong_int(self):
|
|
"""
|
|
testing unsigned long iteger
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...')
|
|
|
|
# -------------------------------------------
|
|
def test_ulong_int_out_of_upper_range(self):
|
|
"""
|
|
testing for input above acceptable range
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296)
|
|
|
|
# -------------------------------------------
|
|
def test_ulong_int_out_of_lower_range(self):
|
|
"""
|
|
testing for input below acceptable range
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_long, -1)
|
|
|
|
# ---------------------------------
|
|
def test_ulong_int_with_fraction(self):
|
|
"""
|
|
the fractional part should be ignored...
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...')
|
|
|
|
# -------------------------------
|
|
def test_ulong_int_decode(self):
|
|
"""
|
|
unsigned long decoding
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...')
|
|
|
|
|
|
# --------------------------------------- #
|
|
# Unsigned Long Long Integers - 64 bits #
|
|
# --------------------------------------- #
|
|
|
|
# -----------------------
|
|
def test_ulong_long_int(self):
|
|
"""
|
|
testing unsinged long long integer
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...')
|
|
|
|
# -------------------------------------------
|
|
def test_ulong_long_int_out_of_upper_range(self):
|
|
"""
|
|
testing for input above acceptable range
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616)
|
|
|
|
# -------------------------------------------
|
|
def test_ulong_long_int_out_of_lower_range(self):
|
|
"""
|
|
testing for input below acceptable range
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_longlong, -1)
|
|
|
|
# ---------------------------------
|
|
def test_ulong_long_int_with_fraction(self):
|
|
"""
|
|
the fractional part should be ignored...
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...')
|
|
|
|
# ------------------------------------
|
|
def test_ulong_long_int_decode(self):
|
|
"""
|
|
unsigned long long decoding
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...')
|
|
|
|
# -----------------------------------
|
|
# -----------------------------------
|
|
class BitTestCase(BaseDataTypes):
|
|
|
|
"""
|
|
Handles bits
|
|
"""
|
|
|
|
# ----------------------------------------------
|
|
def callFunc(self, functionName, *args):
|
|
"""
|
|
helper function
|
|
"""
|
|
for ele in args:
|
|
getattr(self.codec, functionName)(ele)
|
|
|
|
self.codec.flush()
|
|
return self.codec.stream.getvalue()
|
|
|
|
# -------------------
|
|
def test_bit1(self):
|
|
"""
|
|
sends in 11
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...')
|
|
|
|
# -------------------
|
|
def test_bit2(self):
|
|
"""
|
|
sends in 10011
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...')
|
|
|
|
# -------------------
|
|
def test_bit3(self):
|
|
"""
|
|
sends in 1110100111 [10 bits(right to left), should be compressed into two octets]
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...')
|
|
|
|
# ------------------------------------
|
|
def test_bit_decode_1(self):
|
|
"""
|
|
decode bit 1
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...')
|
|
|
|
# ------------------------------------
|
|
def test_bit_decode_0(self):
|
|
"""
|
|
decode bit 0
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...')
|
|
|
|
# -----------------------------------
|
|
# -----------------------------------
|
|
class StringTestCase(BaseDataTypes):
|
|
|
|
"""
|
|
Handles short strings, long strings
|
|
"""
|
|
|
|
# ------------------------------------------------------------- #
|
|
# Short Strings - 8 bit length followed by zero or more octets #
|
|
# ------------------------------------------------------------- #
|
|
|
|
# ---------------------------------------
|
|
def test_short_string_zero_length(self):
|
|
"""
|
|
0 length short string
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...')
|
|
|
|
# -------------------------------------------
|
|
def test_short_string_positive_length(self):
|
|
"""
|
|
positive length short string
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...')
|
|
|
|
# -------------------------------------------
|
|
def test_short_string_out_of_upper_range(self):
|
|
"""
|
|
string length > 255
|
|
"""
|
|
self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256)
|
|
|
|
# ------------------------------------
|
|
def test_short_string_decode(self):
|
|
"""
|
|
short string decode
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...')
|
|
|
|
|
|
# ------------------------------------------------------------- #
|
|
# Long Strings - 32 bit length followed by zero or more octets #
|
|
# ------------------------------------------------------------- #
|
|
|
|
# ---------------------------------------
|
|
def test_long_string_zero_length(self):
|
|
"""
|
|
0 length long string
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...')
|
|
|
|
# -------------------------------------------
|
|
def test_long_string_positive_length(self):
|
|
"""
|
|
positive length long string
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...')
|
|
|
|
# ------------------------------------
|
|
def test_long_string_decode(self):
|
|
"""
|
|
long string decode
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...')
|
|
|
|
|
|
# --------------------------------------
|
|
# --------------------------------------
|
|
class TimestampTestCase(BaseDataTypes):
|
|
|
|
"""
|
|
No need of any test cases here as timestamps are implemented as long long which is tested above
|
|
"""
|
|
pass
|
|
|
|
# ---------------------------------------
|
|
# ---------------------------------------
|
|
class FieldTableTestCase(BaseDataTypes):
|
|
|
|
"""
|
|
Handles Field Tables
|
|
|
|
Only S/I type messages seem to be implemented currently
|
|
"""
|
|
|
|
# -------------------------
|
|
def __init__(self, *args):
|
|
"""
|
|
sets constants for use in tests
|
|
"""
|
|
|
|
BaseDataTypes.__init__(self, *args)
|
|
self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'}
|
|
self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1'
|
|
|
|
# -------------------------------------------
|
|
def test_field_table_name_value_pair(self):
|
|
"""
|
|
valid name value pair
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...')
|
|
|
|
# ---------------------------------------------------
|
|
def test_field_table_multiple_name_value_pair(self):
|
|
"""
|
|
multiple name value pair
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...')
|
|
|
|
# ------------------------------------
|
|
def test_field_table_decode(self):
|
|
"""
|
|
field table decode
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...')
|
|
|
|
|
|
# ------------------------------------
|
|
# ------------------------------------
|
|
class ContentTestCase(BaseDataTypes):
|
|
|
|
"""
|
|
Handles Content data types
|
|
"""
|
|
|
|
# -----------------------------
|
|
def test_content_inline(self):
|
|
"""
|
|
inline content
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...')
|
|
|
|
# --------------------------------
|
|
def test_content_reference(self):
|
|
"""
|
|
reference content
|
|
"""
|
|
self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...')
|
|
|
|
# ------------------------------------
|
|
def test_content_inline_decode(self):
|
|
"""
|
|
inline content decode
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...')
|
|
|
|
# ------------------------------------
|
|
def test_content_reference_decode(self):
|
|
"""
|
|
reference content decode
|
|
"""
|
|
self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...')
|
|
|
|
# ------------------------ #
|
|
# Pre - existing test code #
|
|
# ------------------------ #
|
|
|
|
# ---------------------
|
|
def test(type, value):
|
|
"""
|
|
old test function cut/copy/paste from qpid/codec.py
|
|
"""
|
|
if isinstance(value, (list, tuple)):
|
|
values = value
|
|
else:
|
|
values = [value]
|
|
stream = StringIO()
|
|
codec = Codec(stream, SPEC)
|
|
for v in values:
|
|
codec.encode(type, v)
|
|
codec.flush()
|
|
enc = stream.getvalue()
|
|
stream.reset()
|
|
dup = []
|
|
for i in xrange(len(values)):
|
|
dup.append(codec.decode(type))
|
|
if values != dup:
|
|
raise AssertionError("%r --> %r --> %r" % (values, enc, dup))
|
|
|
|
# -----------------------
|
|
def dotest(type, value):
|
|
"""
|
|
old test function cut/copy/paste from qpid/codec.py
|
|
"""
|
|
args = (type, value)
|
|
test(*args)
|
|
|
|
# -------------
|
|
def oldtests():
|
|
"""
|
|
old test function cut/copy/paste from qpid/codec.py
|
|
"""
|
|
for value in ("1", "0", "110", "011", "11001", "10101", "10011"):
|
|
for i in range(10):
|
|
dotest("bit", map(lambda x: x == "1", value*i))
|
|
|
|
for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}):
|
|
dotest("table", value)
|
|
|
|
for type in ("octet", "short", "long", "longlong"):
|
|
for value in range(0, 256):
|
|
dotest(type, value)
|
|
|
|
for type in ("shortstr", "longstr"):
|
|
for value in ("", "a", "asdf"):
|
|
dotest(type, value)
|
|
|
|
# -----------------------------------------
|
|
class oldTests(unittest.TestCase):
|
|
|
|
"""
|
|
class to handle pre-existing test cases
|
|
"""
|
|
|
|
# ---------------------------
|
|
def test_oldtestcases(self):
|
|
"""
|
|
call the old tests
|
|
"""
|
|
return oldtests()
|
|
|
|
# ---------------------------
|
|
# ---------------------------
|
|
if __name__ == '__main__':
|
|
|
|
codec_test_suite = unittest.TestSuite()
|
|
|
|
#adding all the test suites...
|
|
codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase))
|
|
codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase))
|
|
codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase))
|
|
codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase))
|
|
codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase))
|
|
codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase))
|
|
|
|
#loading pre-existing test case from qpid/codec.py
|
|
codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests))
|
|
|
|
run_output_stream = StringIO()
|
|
test_runner = unittest.TextTestRunner(run_output_stream, '', '')
|
|
test_result = test_runner.run(codec_test_suite)
|
|
|
|
print '\n%d test run...' % (test_result.testsRun)
|
|
|
|
if test_result.wasSuccessful():
|
|
print '\nAll tests successful\n'
|
|
|
|
if test_result.failures:
|
|
print '\n----------'
|
|
print '%d FAILURES:' % (len(test_result.failures))
|
|
print '----------\n'
|
|
for failure in test_result.failures:
|
|
print str(failure[0]) + ' ... FAIL'
|
|
|
|
if test_result.errors:
|
|
print '\n---------'
|
|
print '%d ERRORS:' % (len(test_result.errors))
|
|
print '---------\n'
|
|
|
|
for error in test_result.errors:
|
|
print str(error[0]) + ' ... ERROR'
|
|
|
|
f = open('codec_unit_test_output.txt', 'w')
|
|
f.write(str(run_output_stream.getvalue()))
|
|
f.close()
|