45 lines
1.9 KiB
Python

#------------------------------------------------------------------------------
# Copyright 2017, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
"""Module for testing AQ objects."""
import cx_Oracle
class TestAQ(BaseTestCase):
def __verifyAttribute(self, obj, attrName, value):
setattr(obj, attrName, value)
self.assertEqual(getattr(obj, attrName), value)
def testDeqOptions(self):
"test getting/setting dequeue options attributes"
options = self.connection.deqoptions()
self.__verifyAttribute(options, "condition", "TEST_CONDITION")
self.__verifyAttribute(options, "consumername", "TEST_CONSUMERNAME")
self.__verifyAttribute(options, "correlation", "TEST_CORRELATION")
self.__verifyAttribute(options, "mode", cx_Oracle.DEQ_LOCKED)
self.__verifyAttribute(options, "navigation",
cx_Oracle.DEQ_NEXT_TRANSACTION)
self.__verifyAttribute(options, "transformation",
"TEST_TRANSFORMATION")
self.__verifyAttribute(options, "visibility", cx_Oracle.ENQ_IMMEDIATE)
self.__verifyAttribute(options, "wait", 1287)
def testEnqOptions(self):
"test getting/setting enqueue options attributes"
options = self.connection.enqoptions()
self.__verifyAttribute(options, "visibility", cx_Oracle.ENQ_IMMEDIATE)
def testMsgProps(self):
"test getting/setting message properties attributes"
props = self.connection.msgproperties()
self.__verifyAttribute(props, "correlation", "TEST_CORRELATION")
self.__verifyAttribute(props, "delay", 60)
self.__verifyAttribute(props, "exceptionq", "TEST_EXCEPTIONQ")
self.__verifyAttribute(props, "expiration", 30)
self.assertEqual(props.attempts, 0)
self.__verifyAttribute(props, "priority", 1)
self.assertEqual(props.state, cx_Oracle.MSG_READY)