diff --git a/doc/src/connection.rst b/doc/src/connection.rst index 1abd83b..4baa4db 100644 --- a/doc/src/connection.rst +++ b/doc/src/connection.rst @@ -480,11 +480,18 @@ Connection Object This attribute is an extension to the DB API definition. -.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY) +.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY, name=None) - Return a new :ref:`subscription object ` using the connection. - Currently the namespace and protocol parameters cannot have any other - meaningful values. + Return a new :ref:`subscription object ` that receives + notifications for events that take place in the database that match the + given parameters. + + The namespace parameter specifies the namespace the subscription uses. It + can be one of :data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE` or + :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`. + + The protocol parameter specifies the protocol to use when notifications are + sent. Currently the only valid value is :data:`cx_Oracle.SUBSCR_PROTO_OCI`. The callback is expected to be a callable that accepts a single parameter. A :ref:`message object ` is passed to this callback whenever a @@ -496,11 +503,12 @@ Connection Object The operations parameter enables filtering of the messages that are sent (insert, update, delete). The default value will send notifications for all - operations. + operations. This parameter is only used when the namespace is set to + :data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE`. The port parameter specifies the listening port for callback notifications from the database server. If not specified, an unused port will be selected - by the database. + by the Oracle Client libraries. The qos parameter specifies quality of service options. It should be one or more of the following flags, OR'ed together: @@ -510,9 +518,10 @@ Connection Object :data:`cx_Oracle.SUBSCR_QOS_QUERY`, :data:`cx_Oracle.SUBSCR_QOS_BEST_EFFORT`. - The ipAddress parameter specifies the IP address (IPv4 or IPv6) to bind for - callback notifications from the database server. If not specified, the - client IP address will be determined by the Oracle Client libraries. + The ipAddress parameter specifies the IP address (IPv4 or IPv6) in standard + string notation to bind for callback notifications from the database + server. If not specified, the client IP address will be determined by the + Oracle Client libraries. The groupingClass parameter specifies what type of grouping of notifications should take place. Currently, if set, this value can only be @@ -522,8 +531,18 @@ Connection Object values :data:`cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY` (the default) or :data:`cx_Oracle.SUBSCR_GROUPING_TYPE_LAST`. + The name parameter is used to identify the subscription and is specific to + the selected namespace. If the namespace parameter is + :data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE` then the name is optional and + can be any value. If the namespace parameter is + :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`, however, the name must be in the + format '' for single consumer queues and + ':' for multiple consumer queues, and identifies + the queue that will be monitored for messages. The queue name may include + the schema, if needed. + *New in version 6.4:* The parameters ipAddress, groupingClass, - groupingValue and groupingType were added. + groupingValue, groupingType and name were added. .. note:: @@ -531,9 +550,11 @@ Connection Object .. note:: - Do not close the connection before the subscription object is deleted - or the subscription object will not be deregistered in the database. - This is done automatically if connection.close() is never called. + The subscription can be deregistered in the database by calling the + function :meth:`~Connection.unsubscribe()`. If this method is not + called and the connection that was used to create the subscription is + explictly closed using the function :meth:`~Connection.close()`, the + subscription will not be deregistered in the database. .. attribute:: Connection.tnsentry @@ -546,6 +567,16 @@ Connection Object This attribute is an extension to the DB API definition. +.. method:: Connection.unsubscribe(subscr) + + Unsubscribe from events in the database that were originally subscribed to + using :meth:`~Connection.subscribe()`. The connection used to unsubscribe + should be the same one used to create the subscription, or should access + the same database and be connected as the same user name. + + .. versionadded:: 6.4 + + .. attribute:: Connection.username This read-only attribute returns the name of the user which established the diff --git a/doc/src/module.rst b/doc/src/module.rst index a21aca3..ab7e585 100644 --- a/doc/src/module.rst +++ b/doc/src/module.rst @@ -651,6 +651,12 @@ values for the :attr:`Message.type` attribute of the messages that are sent for subscriptions created by the :meth:`Connection.subscribe()` method. +.. data:: EVENT_AQ + + This constant is used to specify that one or more messages are available + for dequeuing on the queue specified when the subscription was created. + + .. data:: EVENT_DEREG This constant is used to specify that the subscription has been @@ -847,12 +853,15 @@ These constants are extensions to the DB API definition. They are possible values for the namespace parameter of the :meth:`Connection.subscribe()` method. +.. data:: SUBSCR_NAMESPACE_AQ + + This constant is used to specify that notifications should be sent when a + queue has messages available to dequeue. .. data:: SUBSCR_NAMESPACE_DBCHANGE This constant is used to specify that database change notification or query - change notification messages are to be sent. This is the default value and - currently the only value that is supported. + change notification messages are to be sent. This is the default value. Subscription Protocols diff --git a/doc/src/subscription.rst b/doc/src/subscription.rst index ca837f9..6ec3cb5 100644 --- a/doc/src/subscription.rst +++ b/doc/src/subscription.rst @@ -30,6 +30,23 @@ Subscription Object This attribute was never intended to be exposed. +.. attribute:: Subscription.ipAddress + + This read-only attribute returns the IP address used for callback + notifications from the database server. If not set during construction, + this value is None. + + .. versionadded:: 6.4 + + +.. attribute:: Subscription.name + + This read-only attribute returns the name used to register the subscription + when it was created. + + .. versionadded:: 6.4 + + .. attribute:: Subscription.namespace This read-only attribute returns the namespace used to register the @@ -43,13 +60,6 @@ Subscription Object subscription. -.. attribute:: Subscription.ipAddress - - This read-only attribute returns the IP address used for callback - notifications from the database server. If not set during construction, - this value is None. - - .. attribute:: Subscription.port This read-only attribute returns the port used for callback notifications @@ -103,12 +113,6 @@ Message Objects the notification. -.. attribute:: Message.txid - - This read-only attribute returns the id of the transaction that generated - the notification. - - .. attribute:: Message.queries This read-only attribute returns a list of message query objects that give @@ -117,6 +121,25 @@ Message Objects :data:`~cx_Oracle.SUBSCR_QOS_QUERY` when the subscription was created. +.. attribute:: Message.queueName + + This read-only attribute returns the name of the queue which generated the + notification. It will only be populated if the subscription was created + with the namespace :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`. + + .. versionadded:: 6.4 + + +.. attribute:: Message.consumerName + + This read-only attribute returns the name of the consumer which generated + the notification. It will be populated if the subscription was created with + the namespace :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ` and the queue is a + multiple consumer queue. + + .. versionadded:: 6.4 + + .. attribute:: Message.subscription This read-only attribute returns the subscription object for which this @@ -131,6 +154,12 @@ Message Objects :data:`~cx_Oracle.SUBSCR_QOS_QUERY` when the subscription was created. +.. attribute:: Message.txid + + This read-only attribute returns the id of the transaction that generated + the notification. + + .. attribute:: Message.type This read-only attribute returns the type of message that has been sent. diff --git a/samples/AdvancedQueuing.py b/samples/AdvancedQueuing.py index 51026d0..681f9c1 100644 --- a/samples/AdvancedQueuing.py +++ b/samples/AdvancedQueuing.py @@ -1,5 +1,5 @@ #------------------------------------------------------------------------------ -# Copyright 2016, 2017, Oracle and/or its affiliates. All rights reserved. +# Copyright 2016, 2018, Oracle and/or its affiliates. All rights reserved. # # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # @@ -10,7 +10,7 @@ #------------------------------------------------------------------------------ # AdvancedQueuing.py # This script demonstrates how to use advanced queuing using cx_Oracle. It -# creates a simple type and enqueues and dequeues a few objects. +# makes use of a simple type and queue created in the sample setup. # # This script requires cx_Oracle 5.3 and higher. #------------------------------------------------------------------------------ @@ -29,44 +29,17 @@ import decimal connection = cx_Oracle.Connection(SampleEnv.MAIN_CONNECT_STRING) cursor = connection.cursor() -# drop queue table, if present -cursor.execute(""" - select count(*) - from user_tables - where table_name = :name""", name = QUEUE_TABLE_NAME) -count, = cursor.fetchone() -if count > 0: - print("Dropping queue table...") - cursor.callproc("dbms_aqadm.drop_queue_table", (QUEUE_TABLE_NAME, True)) - -# drop type, if present -cursor.execute(""" - select count(*) - from user_types - where type_name = :name""", name = BOOK_TYPE_NAME) -count, = cursor.fetchone() -if count > 0: - print("Dropping books type...") - cursor.execute("drop type %s" % BOOK_TYPE_NAME) - -# create type -print("Creating books type...") -cursor.execute(""" - create type %s as object ( - title varchar2(100), - authors varchar2(100), - price number(5,2) - );""" % BOOK_TYPE_NAME) - -# create queue table and quueue and start the queue -print("Creating queue table...") -cursor.callproc("dbms_aqadm.create_queue_table", - (QUEUE_TABLE_NAME, BOOK_TYPE_NAME)) -cursor.callproc("dbms_aqadm.create_queue", (QUEUE_NAME, QUEUE_TABLE_NAME)) -cursor.callproc("dbms_aqadm.start_queue", (QUEUE_NAME,)) +# dequeue all existing messages to ensure the queue is empty, just so that +# the results are consistent +booksType = connection.gettype(BOOK_TYPE_NAME) +book = booksType.newobject() +options = connection.deqoptions() +options.wait = cx_Oracle.DEQ_NO_WAIT +messageProperties = connection.msgproperties() +while connection.deq(QUEUE_NAME, options, messageProperties, book): + pass # enqueue a few messages -booksType = connection.gettype(BOOK_TYPE_NAME) book1 = booksType.newobject() book1.TITLE = "The Fellowship of the Ring" book1.AUTHORS = "Tolkien, J.R.R." @@ -76,7 +49,6 @@ book2.TITLE = "Harry Potter and the Philosopher's Stone" book2.AUTHORS = "Rowling, J.K." book2.PRICE = decimal.Decimal("7.99") options = connection.enqoptions() -messageProperties = connection.msgproperties() for book in (book1, book2): print("Enqueuing book", book.TITLE) connection.enq(QUEUE_NAME, options, messageProperties, book) @@ -88,4 +60,5 @@ options.navigation = cx_Oracle.DEQ_FIRST_MSG options.wait = cx_Oracle.DEQ_NO_WAIT while connection.deq(QUEUE_NAME, options, messageProperties, book): print("Dequeued book", book.TITLE) +connection.commit() diff --git a/samples/AdvancedQueuingNotification.py b/samples/AdvancedQueuingNotification.py new file mode 100644 index 0000000..e17068c --- /dev/null +++ b/samples/AdvancedQueuingNotification.py @@ -0,0 +1,47 @@ +#------------------------------------------------------------------------------ +# Copyright 2018, Oracle and/or its affiliates. All rights reserved. +#------------------------------------------------------------------------------ + +#------------------------------------------------------------------------------ +# AdvancedQueuingNotification.py +# This script demonstrates using advanced queuing notification. Once this +# script is running, use another session to enqueue a few messages to the +# "BOOKS" queue. This is most easily accomplished by running the +# AdvancedQueuing sample. +# +# This script requires cx_Oracle 6.4 and higher. +#------------------------------------------------------------------------------ + +from __future__ import print_function + +import cx_Oracle +import SampleEnv +import threading +import time + +registered = True + +def callback(message): + global registered + print("Message type:", message.type) + if message.type == cx_Oracle.EVENT_DEREG: + print("Deregistration has taken place...") + registered = False + return + print("Queue name:", message.queueName) + print("Consumer name:", message.consumerName) + +connection = cx_Oracle.Connection(SampleEnv.MAIN_CONNECT_STRING, events = True) +sub = connection.subscribe(namespace = cx_Oracle.SUBSCR_NAMESPACE_AQ, + name = "BOOKS", callback = callback, timeout = 300) +print("Subscription:", sub) +print("--> Connection:", sub.connection) +print("--> Callback:", sub.callback) +print("--> Namespace:", sub.namespace) +print("--> Protocol:", sub.protocol) +print("--> Timeout:", sub.timeout) + +while registered: + print("Waiting for notifications....") + time.sleep(5) + diff --git a/samples/sql/SetupSamples.sql b/samples/sql/SetupSamples.sql index 68e00e4..91d1884 100644 --- a/samples/sql/SetupSamples.sql +++ b/samples/sql/SetupSamples.sql @@ -68,6 +68,13 @@ create or replace type &main_user..udt_Building as object ( ); / +create or replace type &main_user..udt_Book as object ( + Title varchar2(100), + Authors varchar2(100), + Price number(5,2) +); +/ + -- create tables create table &main_user..TestNumbers ( @@ -147,6 +154,15 @@ create table &main_user..Ptab ( mydata varchar(20) ); +-- create queue table and queues for demonstrating advanced queuing +begin + dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE', + '&main_user..UDT_BOOK'); + dbms_aqadm.create_queue('&main_user..BOOKS', '&main_user..BOOK_QUEUE'); + dbms_aqadm.start_queue('&main_user..BOOKS'); +end; +/ + -- populate tables begin diff --git a/src/cxoConnection.c b/src/cxoConnection.c index ea3804e..d9a2368 100644 --- a/src/cxoConnection.c +++ b/src/cxoConnection.c @@ -1457,27 +1457,91 @@ static PyObject *cxoConnection_subscribe(cxoConnection *conn, PyObject* args, { static char *keywordList[] = { "namespace", "protocol", "callback", "timeout", "operations", "port", "qos", "ipAddress", - "groupingClass", "groupingValue", "groupingType", NULL }; - uint32_t namespace, protocol, port, timeout, operations, qos; - uint8_t groupingClass, groupingType; - PyObject *callback, *ipAddress; - uint32_t groupingValue; + "groupingClass", "groupingValue", "groupingType", "name", NULL }; + PyObject *callback, *ipAddress, *name; + cxoBuffer ipAddressBuffer, nameBuffer; + dpiSubscrCreateParams params; + cxoSubscr *subscr; - groupingClass = 0; - callback = ipAddress = NULL; - timeout = port = qos = groupingValue = 0; - groupingType = DPI_SUBSCR_GROUPING_TYPE_SUMMARY; - namespace = DPI_SUBSCR_NAMESPACE_DBCHANGE; - protocol = DPI_SUBSCR_PROTO_CALLBACK; - operations = DPI_OPCODE_ALL_OPS; - if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|iiOiiiiObib", - keywordList, &namespace, &protocol, &callback, &timeout, - &operations, &port, &qos, &ipAddress, &groupingClass, - &groupingValue, &groupingType)) + // get default values for subscription parameters + if (dpiContext_initSubscrCreateParams(cxoDpiContext, ¶ms) < 0) + return cxoError_raiseAndReturnNull(); + + // validate parameters + callback = name = ipAddress = NULL; + if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|iiOiiiiObibO", + keywordList, ¶ms.subscrNamespace, ¶ms.protocol, &callback, + ¶ms.timeout, ¶ms.operations, ¶ms.portNumber, + ¶ms.qos, &ipAddress, ¶ms.groupingClass, + ¶ms.groupingValue, ¶ms.groupingType, &name)) return NULL; - return (PyObject*) cxoSubscr_new(conn, namespace, protocol, ipAddress, - port, callback, timeout, operations, qos, groupingClass, - groupingValue, groupingType); + + // populate IP address in parameters, if applicable + cxoBuffer_init(&ipAddressBuffer); + if (ipAddress) { + if (cxoBuffer_fromObject(&ipAddressBuffer, ipAddress, + conn->encodingInfo.encoding) < 0) + return NULL; + params.ipAddress = ipAddressBuffer.ptr; + params.ipAddressLength = ipAddressBuffer.size; + } + + // populate name in parameters, if applicable + cxoBuffer_init(&nameBuffer); + if (name) { + if (cxoBuffer_fromObject(&nameBuffer, name, + conn->encodingInfo.encoding) < 0) { + cxoBuffer_clear(&ipAddressBuffer); + return NULL; + } + params.name = nameBuffer.ptr; + params.nameLength = nameBuffer.size; + } + + // create Python subscription object + subscr = (cxoSubscr*) cxoPyTypeSubscr.tp_alloc(&cxoPyTypeSubscr, 0); + if (!subscr) { + cxoBuffer_clear(&ipAddressBuffer); + cxoBuffer_clear(&nameBuffer); + return NULL; + } + Py_INCREF(conn); + subscr->connection = conn; + Py_XINCREF(callback); + subscr->callback = callback; + subscr->namespace = params.subscrNamespace; + subscr->protocol = params.protocol; + Py_XINCREF(ipAddress); + subscr->ipAddress = ipAddress; + Py_XINCREF(name); + subscr->name = name; + subscr->port = params.portNumber; + subscr->timeout = params.timeout; + subscr->operations = params.operations; + subscr->qos = params.qos; + subscr->groupingClass = params.groupingClass; + subscr->groupingValue = params.groupingValue; + subscr->groupingType = params.groupingType; + + // populate callback in parameters, if applicable + if (callback) { + params.callback = (dpiSubscrCallback) cxoSubscr_callback; + params.callbackContext = subscr; + } + + // create ODPI-C subscription + if (dpiConn_newSubscription(conn->handle, ¶ms, &subscr->handle, + &subscr->id) < 0) { + cxoError_raiseAndReturnNull(); + cxoBuffer_clear(&ipAddressBuffer); + cxoBuffer_clear(&nameBuffer); + Py_DECREF(subscr); + return NULL; + } + cxoBuffer_clear(&ipAddressBuffer); + cxoBuffer_clear(&nameBuffer); + + return (PyObject*) subscr; } diff --git a/src/cxoModule.c b/src/cxoModule.c index 80c9f5c..0b511b7 100644 --- a/src/cxoModule.c +++ b/src/cxoModule.c @@ -438,6 +438,7 @@ static PyObject *cxoModule_initialize(void) CXO_ADD_INT_CONSTANT("SUBSCR_QOS_BEST_EFFORT", DPI_SUBSCR_QOS_BEST_EFFORT) // add constants for subscription namespaces + CXO_ADD_INT_CONSTANT("SUBSCR_NAMESPACE_AQ", DPI_SUBSCR_NAMESPACE_AQ) CXO_ADD_INT_CONSTANT("SUBSCR_NAMESPACE_DBCHANGE", DPI_SUBSCR_NAMESPACE_DBCHANGE) @@ -459,6 +460,7 @@ static PyObject *cxoModule_initialize(void) CXO_ADD_INT_CONSTANT("EVENT_DEREG", DPI_EVENT_DEREG) CXO_ADD_INT_CONSTANT("EVENT_OBJCHANGE", DPI_EVENT_OBJCHANGE) CXO_ADD_INT_CONSTANT("EVENT_QUERYCHANGE", DPI_EVENT_QUERYCHANGE) + CXO_ADD_INT_CONSTANT("EVENT_AQ", DPI_EVENT_AQ) // add constants for opcodes CXO_ADD_INT_CONSTANT("OPCODE_ALLOPS", DPI_OPCODE_ALL_OPS) diff --git a/src/cxoModule.h b/src/cxoModule.h index b6a0124..08e30de 100644 --- a/src/cxoModule.h +++ b/src/cxoModule.h @@ -279,6 +279,8 @@ struct cxoMessage { PyObject *txId; PyObject *tables; PyObject *queries; + PyObject *queueName; + PyObject *consumerName; }; struct cxoMessageQuery { @@ -358,6 +360,7 @@ struct cxoSubscr { cxoConnection *connection; PyObject *callback; uint32_t namespace; + PyObject *name; uint32_t protocol; PyObject *ipAddress; uint32_t port; @@ -433,11 +436,7 @@ cxoObjectType *cxoObjectType_new(cxoConnection *connection, cxoObjectType *cxoObjectType_newByName(cxoConnection *connection, PyObject *name); -cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace, - uint32_t protocol, PyObject *ipAddress, uint32_t port, - PyObject *callback, uint32_t timeout, uint32_t operations, - uint32_t qos, uint8_t groupingClass, uint32_t groupingValue, - uint8_t groupingType); +void cxoSubscr_callback(cxoSubscr *subscr, dpiSubscrMessage *message); PyObject *cxoTransform_dateFromTicks(PyObject *args); int cxoTransform_fromPython(cxoTransformNum transformNum, PyObject *pyValue, diff --git a/src/cxoSubscr.c b/src/cxoSubscr.c index 35227e7..1913829 100644 --- a/src/cxoSubscr.c +++ b/src/cxoSubscr.c @@ -33,6 +33,7 @@ static PyMemberDef cxoSubscrTypeMembers[] = { { "connection", T_OBJECT, offsetof(cxoSubscr, connection), READONLY }, { "namespace", T_INT, offsetof(cxoSubscr, namespace), READONLY }, + { "name", T_OBJECT, offsetof(cxoSubscr, name), READONLY }, { "protocol", T_INT, offsetof(cxoSubscr, protocol), READONLY }, { "ipAddress", T_OBJECT, offsetof(cxoSubscr, ipAddress), READONLY }, { "port", T_INT, offsetof(cxoSubscr, port), READONLY }, @@ -51,6 +52,8 @@ static PyMemberDef cxoMessageTypeMembers[] = { { "txid", T_OBJECT, offsetof(cxoMessage, txId), READONLY }, { "tables", T_OBJECT, offsetof(cxoMessage, tables), READONLY }, { "queries", T_OBJECT, offsetof(cxoMessage, queries), READONLY }, + { "queueName", T_OBJECT, offsetof(cxoMessage, queueName), READONLY }, + { "consumerName", T_OBJECT, offsetof(cxoMessage, consumerName), READONLY }, { NULL } }; @@ -413,6 +416,18 @@ static int cxoMessage_initialize(cxoMessage *messageObj, if (!messageObj->txId) return -1; } + if (message->queueName) { + messageObj->queueName = cxoPyString_fromEncodedString( + message->queueName, message->queueNameLength, encoding); + if (!messageObj->queueName) + return -1; + } + if (message->consumerName) { + messageObj->consumerName = cxoPyString_fromEncodedString( + message->consumerName, message->consumerNameLength, encoding); + if (!messageObj->consumerName) + return -1; + } switch (message->eventType) { case DPI_EVENT_OBJCHANGE: messageObj->tables = PyList_New(message->numTables); @@ -494,8 +509,7 @@ static int cxoSubscr_callbackHandler(cxoSubscr *subscr, // cxoSubscr_callback() // Routine that is called when a callback needs to be invoked. //----------------------------------------------------------------------------- -static void cxoSubscr_callback(cxoSubscr *subscr, - dpiSubscrMessage *message) +void cxoSubscr_callback(cxoSubscr *subscr, dpiSubscrMessage *message) { #ifdef WITH_THREAD PyGILState_STATE gstate = PyGILState_Ensure(); @@ -513,77 +527,6 @@ static void cxoSubscr_callback(cxoSubscr *subscr, } -//----------------------------------------------------------------------------- -// cxoSubscr_new() -// Allocate a new subscription object. -//----------------------------------------------------------------------------- -cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace, - uint32_t protocol, PyObject *ipAddress, uint32_t port, - PyObject *callback, uint32_t timeout, uint32_t operations, - uint32_t qos, uint8_t groupingClass, uint32_t groupingValue, - uint8_t groupingType) -{ - dpiSubscrCreateParams params; - cxoSubscr *subscr; - cxoBuffer buffer; - - subscr = (cxoSubscr*) cxoPyTypeSubscr.tp_alloc(&cxoPyTypeSubscr, 0); - if (!subscr) - return NULL; - Py_INCREF(connection); - subscr->connection = connection; - Py_XINCREF(callback); - subscr->callback = callback; - subscr->namespace = namespace; - subscr->protocol = protocol; - Py_XINCREF(ipAddress); - subscr->ipAddress = ipAddress; - subscr->port = port; - subscr->timeout = timeout; - subscr->operations = operations; - subscr->qos = qos; - subscr->groupingClass = groupingClass; - subscr->groupingValue = groupingValue; - subscr->groupingType = groupingType; - - if (dpiContext_initSubscrCreateParams(cxoDpiContext, ¶ms) < 0) { - cxoError_raiseAndReturnNull(); - Py_DECREF(subscr); - return NULL; - } - params.subscrNamespace = namespace; - params.protocol = protocol; - if (ipAddress) { - if (cxoBuffer_fromObject(&buffer, ipAddress, - connection->encodingInfo.encoding) < 0) { - Py_DECREF(subscr); - return NULL; - } - params.ipAddress = buffer.ptr; - params.ipAddressLength = buffer.size; - } - params.portNumber = port; - if (callback) { - params.callback = (dpiSubscrCallback) cxoSubscr_callback; - params.callbackContext = subscr; - } - params.timeout = timeout; - params.operations = operations; - params.qos = qos; - params.groupingClass = groupingClass; - params.groupingValue = groupingValue; - params.groupingType = groupingType; - if (dpiConn_newSubscription(connection->handle, ¶ms, &subscr->handle, - &subscr->id) < 0) { - cxoError_raiseAndReturnNull(); - Py_DECREF(subscr); - return NULL; - } - - return subscr; -} - - //----------------------------------------------------------------------------- // cxoSubscr_free() // Free the memory associated with a subscription. @@ -719,6 +662,8 @@ static void cxoMessage_free(cxoMessage *message) Py_CLEAR(message->dbname); Py_CLEAR(message->tables); Py_CLEAR(message->queries); + Py_CLEAR(message->queueName); + Py_CLEAR(message->consumerName); Py_TYPE(message)->tp_free((PyObject*) message); }