#------------------------------------------------------------------------------ # Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. #------------------------------------------------------------------------------ """Module for testing subscriptions.""" import threading class SubscriptionData(object): def __init__(self, numMessagesExpected): self.condition = threading.Condition() self.numMessagesExpected = numMessagesExpected self.numMessagesReceived = 0 self.tableOperations = [] self.rowOperations = [] self.rowids = [] def CallbackHandler(self, message): if message.type != cx_Oracle.EVENT_DEREG: table, = message.tables self.tableOperations.append(table.operation) for row in table.rows: self.rowOperations.append(row.operation) self.rowids.append(row.rowid) self.numMessagesReceived += 1 if message.type == cx_Oracle.EVENT_DEREG or \ self.numMessagesReceived == self.numMessagesExpected: self.condition.acquire() self.condition.notify() self.condition.release() class Subscription(BaseTestCase): def testSubscription(self): "test Subscription for insert, update, delete and truncate" self.cursor.execute("truncate table TestTempTable") # expected values tableOperations = [ cx_Oracle.OPCODE_INSERT, cx_Oracle.OPCODE_UPDATE, cx_Oracle.OPCODE_INSERT, cx_Oracle.OPCODE_DELETE, cx_Oracle.OPCODE_ALTER | cx_Oracle.OPCODE_ALLROWS ] rowOperations = [ cx_Oracle.OPCODE_INSERT, cx_Oracle.OPCODE_UPDATE, cx_Oracle.OPCODE_INSERT, cx_Oracle.OPCODE_DELETE ] rowids = [] # set up subscription data = SubscriptionData(5) connection = cx_Oracle.connect(USERNAME, PASSWORD, TNSENTRY, threaded = True, events = True) sub = connection.subscribe(callback = data.CallbackHandler, timeout = 10, qos = cx_Oracle.SUBSCR_QOS_ROWIDS) sub.registerquery("select * from TestTempTable") connection.autocommit = True cursor = connection.cursor() # insert statement cursor.execute("insert into TestTempTable values (1, 'test')") cursor.execute("select rowid from TestTempTable where IntCol = 1") rowids.extend(r for r, in cursor) # update statement cursor.execute(""" update TestTempTable set StringCol = 'update' where IntCol = 1""") cursor.execute("select rowid from TestTempTable where IntCol = 1") rowids.extend(r for r, in cursor) # second insert statement cursor.execute("insert into TestTempTable values (2, 'test2')") cursor.execute("select rowid from TestTempTable where IntCol = 2") rowids.extend(r for r, in cursor) # delete statement cursor.execute("delete TestTempTable where IntCol = 2") rowids.append(rowids[-1]) # truncate table cursor.execute("truncate table TestTempTable") # wait for all messages to be sent data.condition.acquire() data.condition.wait(10) # verify the correct messages were sent self.assertEqual(data.tableOperations, tableOperations) self.assertEqual(data.rowOperations, rowOperations) self.assertEqual(data.rowids, rowids) # test string format of subscription object is as expected fmt = ">" expectedValue = fmt % (USERNAME, TNSENTRY) self.assertEqual(str(sub), expectedValue)