Added support for receiving notifications when AQ messages are available to be

dequeued.
This commit is contained in:
Anthony Tuininga 2018-05-29 21:54:06 -06:00
parent 6583cdfc51
commit 211298209b
10 changed files with 279 additions and 164 deletions

View File

@ -480,11 +480,18 @@ Connection Object
This attribute is an extension to the DB API definition.
.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY)
.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY, name=None)
Return a new :ref:`subscription object <subscrobj>` using the connection.
Currently the namespace and protocol parameters cannot have any other
meaningful values.
Return a new :ref:`subscription object <subscrobj>` that receives
notifications for events that take place in the database that match the
given parameters.
The namespace parameter specifies the namespace the subscription uses. It
can be one of :data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE` or
:data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`.
The protocol parameter specifies the protocol to use when notifications are
sent. Currently the only valid value is :data:`cx_Oracle.SUBSCR_PROTO_OCI`.
The callback is expected to be a callable that accepts a single parameter.
A :ref:`message object <msgobjects>` is passed to this callback whenever a
@ -496,11 +503,12 @@ Connection Object
The operations parameter enables filtering of the messages that are sent
(insert, update, delete). The default value will send notifications for all
operations.
operations. This parameter is only used when the namespace is set to
:data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE`.
The port parameter specifies the listening port for callback notifications
from the database server. If not specified, an unused port will be selected
by the database.
by the Oracle Client libraries.
The qos parameter specifies quality of service options. It should be one or
more of the following flags, OR'ed together:
@ -510,9 +518,10 @@ Connection Object
:data:`cx_Oracle.SUBSCR_QOS_QUERY`,
:data:`cx_Oracle.SUBSCR_QOS_BEST_EFFORT`.
The ipAddress parameter specifies the IP address (IPv4 or IPv6) to bind for
callback notifications from the database server. If not specified, the
client IP address will be determined by the Oracle Client libraries.
The ipAddress parameter specifies the IP address (IPv4 or IPv6) in standard
string notation to bind for callback notifications from the database
server. If not specified, the client IP address will be determined by the
Oracle Client libraries.
The groupingClass parameter specifies what type of grouping of
notifications should take place. Currently, if set, this value can only be
@ -522,8 +531,18 @@ Connection Object
values :data:`cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY` (the default) or
:data:`cx_Oracle.SUBSCR_GROUPING_TYPE_LAST`.
The name parameter is used to identify the subscription and is specific to
the selected namespace. If the namespace parameter is
:data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE` then the name is optional and
can be any value. If the namespace parameter is
:data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`, however, the name must be in the
format '<QUEUE_NAME>' for single consumer queues and
'<QUEUE_NAME>:<CONSUMER_NAME>' for multiple consumer queues, and identifies
the queue that will be monitored for messages. The queue name may include
the schema, if needed.
*New in version 6.4:* The parameters ipAddress, groupingClass,
groupingValue and groupingType were added.
groupingValue, groupingType and name were added.
.. note::
@ -531,9 +550,11 @@ Connection Object
.. note::
Do not close the connection before the subscription object is deleted
or the subscription object will not be deregistered in the database.
This is done automatically if connection.close() is never called.
The subscription can be deregistered in the database by calling the
function :meth:`~Connection.unsubscribe()`. If this method is not
called and the connection that was used to create the subscription is
explictly closed using the function :meth:`~Connection.close()`, the
subscription will not be deregistered in the database.
.. attribute:: Connection.tnsentry
@ -546,6 +567,16 @@ Connection Object
This attribute is an extension to the DB API definition.
.. method:: Connection.unsubscribe(subscr)
Unsubscribe from events in the database that were originally subscribed to
using :meth:`~Connection.subscribe()`. The connection used to unsubscribe
should be the same one used to create the subscription, or should access
the same database and be connected as the same user name.
.. versionadded:: 6.4
.. attribute:: Connection.username
This read-only attribute returns the name of the user which established the

View File

@ -651,6 +651,12 @@ values for the :attr:`Message.type` attribute of the messages that are sent
for subscriptions created by the :meth:`Connection.subscribe()` method.
.. data:: EVENT_AQ
This constant is used to specify that one or more messages are available
for dequeuing on the queue specified when the subscription was created.
.. data:: EVENT_DEREG
This constant is used to specify that the subscription has been
@ -847,12 +853,15 @@ These constants are extensions to the DB API definition. They are possible
values for the namespace parameter of the :meth:`Connection.subscribe()`
method.
.. data:: SUBSCR_NAMESPACE_AQ
This constant is used to specify that notifications should be sent when a
queue has messages available to dequeue.
.. data:: SUBSCR_NAMESPACE_DBCHANGE
This constant is used to specify that database change notification or query
change notification messages are to be sent. This is the default value and
currently the only value that is supported.
change notification messages are to be sent. This is the default value.
Subscription Protocols

View File

@ -30,6 +30,23 @@ Subscription Object
This attribute was never intended to be exposed.
.. attribute:: Subscription.ipAddress
This read-only attribute returns the IP address used for callback
notifications from the database server. If not set during construction,
this value is None.
.. versionadded:: 6.4
.. attribute:: Subscription.name
This read-only attribute returns the name used to register the subscription
when it was created.
.. versionadded:: 6.4
.. attribute:: Subscription.namespace
This read-only attribute returns the namespace used to register the
@ -43,13 +60,6 @@ Subscription Object
subscription.
.. attribute:: Subscription.ipAddress
This read-only attribute returns the IP address used for callback
notifications from the database server. If not set during construction,
this value is None.
.. attribute:: Subscription.port
This read-only attribute returns the port used for callback notifications
@ -103,12 +113,6 @@ Message Objects
the notification.
.. attribute:: Message.txid
This read-only attribute returns the id of the transaction that generated
the notification.
.. attribute:: Message.queries
This read-only attribute returns a list of message query objects that give
@ -117,6 +121,25 @@ Message Objects
:data:`~cx_Oracle.SUBSCR_QOS_QUERY` when the subscription was created.
.. attribute:: Message.queueName
This read-only attribute returns the name of the queue which generated the
notification. It will only be populated if the subscription was created
with the namespace :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`.
.. versionadded:: 6.4
.. attribute:: Message.consumerName
This read-only attribute returns the name of the consumer which generated
the notification. It will be populated if the subscription was created with
the namespace :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ` and the queue is a
multiple consumer queue.
.. versionadded:: 6.4
.. attribute:: Message.subscription
This read-only attribute returns the subscription object for which this
@ -131,6 +154,12 @@ Message Objects
:data:`~cx_Oracle.SUBSCR_QOS_QUERY` when the subscription was created.
.. attribute:: Message.txid
This read-only attribute returns the id of the transaction that generated
the notification.
.. attribute:: Message.type
This read-only attribute returns the type of message that has been sent.

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------
# Copyright 2016, 2017, Oracle and/or its affiliates. All rights reserved.
# Copyright 2016, 2018, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
@ -10,7 +10,7 @@
#------------------------------------------------------------------------------
# AdvancedQueuing.py
# This script demonstrates how to use advanced queuing using cx_Oracle. It
# creates a simple type and enqueues and dequeues a few objects.
# makes use of a simple type and queue created in the sample setup.
#
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
@ -29,44 +29,17 @@ import decimal
connection = cx_Oracle.Connection(SampleEnv.MAIN_CONNECT_STRING)
cursor = connection.cursor()
# drop queue table, if present
cursor.execute("""
select count(*)
from user_tables
where table_name = :name""", name = QUEUE_TABLE_NAME)
count, = cursor.fetchone()
if count > 0:
print("Dropping queue table...")
cursor.callproc("dbms_aqadm.drop_queue_table", (QUEUE_TABLE_NAME, True))
# drop type, if present
cursor.execute("""
select count(*)
from user_types
where type_name = :name""", name = BOOK_TYPE_NAME)
count, = cursor.fetchone()
if count > 0:
print("Dropping books type...")
cursor.execute("drop type %s" % BOOK_TYPE_NAME)
# create type
print("Creating books type...")
cursor.execute("""
create type %s as object (
title varchar2(100),
authors varchar2(100),
price number(5,2)
);""" % BOOK_TYPE_NAME)
# create queue table and quueue and start the queue
print("Creating queue table...")
cursor.callproc("dbms_aqadm.create_queue_table",
(QUEUE_TABLE_NAME, BOOK_TYPE_NAME))
cursor.callproc("dbms_aqadm.create_queue", (QUEUE_NAME, QUEUE_TABLE_NAME))
cursor.callproc("dbms_aqadm.start_queue", (QUEUE_NAME,))
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
booksType = connection.gettype(BOOK_TYPE_NAME)
book = booksType.newobject()
options = connection.deqoptions()
options.wait = cx_Oracle.DEQ_NO_WAIT
messageProperties = connection.msgproperties()
while connection.deq(QUEUE_NAME, options, messageProperties, book):
pass
# enqueue a few messages
booksType = connection.gettype(BOOK_TYPE_NAME)
book1 = booksType.newobject()
book1.TITLE = "The Fellowship of the Ring"
book1.AUTHORS = "Tolkien, J.R.R."
@ -76,7 +49,6 @@ book2.TITLE = "Harry Potter and the Philosopher's Stone"
book2.AUTHORS = "Rowling, J.K."
book2.PRICE = decimal.Decimal("7.99")
options = connection.enqoptions()
messageProperties = connection.msgproperties()
for book in (book1, book2):
print("Enqueuing book", book.TITLE)
connection.enq(QUEUE_NAME, options, messageProperties, book)
@ -88,4 +60,5 @@ options.navigation = cx_Oracle.DEQ_FIRST_MSG
options.wait = cx_Oracle.DEQ_NO_WAIT
while connection.deq(QUEUE_NAME, options, messageProperties, book):
print("Dequeued book", book.TITLE)
connection.commit()

View File

@ -0,0 +1,47 @@
#------------------------------------------------------------------------------
# Copyright 2018, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# AdvancedQueuingNotification.py
# This script demonstrates using advanced queuing notification. Once this
# script is running, use another session to enqueue a few messages to the
# "BOOKS" queue. This is most easily accomplished by running the
# AdvancedQueuing sample.
#
# This script requires cx_Oracle 6.4 and higher.
#------------------------------------------------------------------------------
from __future__ import print_function
import cx_Oracle
import SampleEnv
import threading
import time
registered = True
def callback(message):
global registered
print("Message type:", message.type)
if message.type == cx_Oracle.EVENT_DEREG:
print("Deregistration has taken place...")
registered = False
return
print("Queue name:", message.queueName)
print("Consumer name:", message.consumerName)
connection = cx_Oracle.Connection(SampleEnv.MAIN_CONNECT_STRING, events = True)
sub = connection.subscribe(namespace = cx_Oracle.SUBSCR_NAMESPACE_AQ,
name = "BOOKS", callback = callback, timeout = 300)
print("Subscription:", sub)
print("--> Connection:", sub.connection)
print("--> Callback:", sub.callback)
print("--> Namespace:", sub.namespace)
print("--> Protocol:", sub.protocol)
print("--> Timeout:", sub.timeout)
while registered:
print("Waiting for notifications....")
time.sleep(5)

View File

@ -68,6 +68,13 @@ create or replace type &main_user..udt_Building as object (
);
/
create or replace type &main_user..udt_Book as object (
Title varchar2(100),
Authors varchar2(100),
Price number(5,2)
);
/
-- create tables
create table &main_user..TestNumbers (
@ -147,6 +154,15 @@ create table &main_user..Ptab (
mydata varchar(20)
);
-- create queue table and queues for demonstrating advanced queuing
begin
dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE',
'&main_user..UDT_BOOK');
dbms_aqadm.create_queue('&main_user..BOOKS', '&main_user..BOOK_QUEUE');
dbms_aqadm.start_queue('&main_user..BOOKS');
end;
/
-- populate tables
begin

View File

@ -1457,27 +1457,91 @@ static PyObject *cxoConnection_subscribe(cxoConnection *conn, PyObject* args,
{
static char *keywordList[] = { "namespace", "protocol", "callback",
"timeout", "operations", "port", "qos", "ipAddress",
"groupingClass", "groupingValue", "groupingType", NULL };
uint32_t namespace, protocol, port, timeout, operations, qos;
uint8_t groupingClass, groupingType;
PyObject *callback, *ipAddress;
uint32_t groupingValue;
"groupingClass", "groupingValue", "groupingType", "name", NULL };
PyObject *callback, *ipAddress, *name;
cxoBuffer ipAddressBuffer, nameBuffer;
dpiSubscrCreateParams params;
cxoSubscr *subscr;
groupingClass = 0;
callback = ipAddress = NULL;
timeout = port = qos = groupingValue = 0;
groupingType = DPI_SUBSCR_GROUPING_TYPE_SUMMARY;
namespace = DPI_SUBSCR_NAMESPACE_DBCHANGE;
protocol = DPI_SUBSCR_PROTO_CALLBACK;
operations = DPI_OPCODE_ALL_OPS;
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|iiOiiiiObib",
keywordList, &namespace, &protocol, &callback, &timeout,
&operations, &port, &qos, &ipAddress, &groupingClass,
&groupingValue, &groupingType))
// get default values for subscription parameters
if (dpiContext_initSubscrCreateParams(cxoDpiContext, &params) < 0)
return cxoError_raiseAndReturnNull();
// validate parameters
callback = name = ipAddress = NULL;
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|iiOiiiiObibO",
keywordList, &params.subscrNamespace, &params.protocol, &callback,
&params.timeout, &params.operations, &params.portNumber,
&params.qos, &ipAddress, &params.groupingClass,
&params.groupingValue, &params.groupingType, &name))
return NULL;
return (PyObject*) cxoSubscr_new(conn, namespace, protocol, ipAddress,
port, callback, timeout, operations, qos, groupingClass,
groupingValue, groupingType);
// populate IP address in parameters, if applicable
cxoBuffer_init(&ipAddressBuffer);
if (ipAddress) {
if (cxoBuffer_fromObject(&ipAddressBuffer, ipAddress,
conn->encodingInfo.encoding) < 0)
return NULL;
params.ipAddress = ipAddressBuffer.ptr;
params.ipAddressLength = ipAddressBuffer.size;
}
// populate name in parameters, if applicable
cxoBuffer_init(&nameBuffer);
if (name) {
if (cxoBuffer_fromObject(&nameBuffer, name,
conn->encodingInfo.encoding) < 0) {
cxoBuffer_clear(&ipAddressBuffer);
return NULL;
}
params.name = nameBuffer.ptr;
params.nameLength = nameBuffer.size;
}
// create Python subscription object
subscr = (cxoSubscr*) cxoPyTypeSubscr.tp_alloc(&cxoPyTypeSubscr, 0);
if (!subscr) {
cxoBuffer_clear(&ipAddressBuffer);
cxoBuffer_clear(&nameBuffer);
return NULL;
}
Py_INCREF(conn);
subscr->connection = conn;
Py_XINCREF(callback);
subscr->callback = callback;
subscr->namespace = params.subscrNamespace;
subscr->protocol = params.protocol;
Py_XINCREF(ipAddress);
subscr->ipAddress = ipAddress;
Py_XINCREF(name);
subscr->name = name;
subscr->port = params.portNumber;
subscr->timeout = params.timeout;
subscr->operations = params.operations;
subscr->qos = params.qos;
subscr->groupingClass = params.groupingClass;
subscr->groupingValue = params.groupingValue;
subscr->groupingType = params.groupingType;
// populate callback in parameters, if applicable
if (callback) {
params.callback = (dpiSubscrCallback) cxoSubscr_callback;
params.callbackContext = subscr;
}
// create ODPI-C subscription
if (dpiConn_newSubscription(conn->handle, &params, &subscr->handle,
&subscr->id) < 0) {
cxoError_raiseAndReturnNull();
cxoBuffer_clear(&ipAddressBuffer);
cxoBuffer_clear(&nameBuffer);
Py_DECREF(subscr);
return NULL;
}
cxoBuffer_clear(&ipAddressBuffer);
cxoBuffer_clear(&nameBuffer);
return (PyObject*) subscr;
}

View File

@ -438,6 +438,7 @@ static PyObject *cxoModule_initialize(void)
CXO_ADD_INT_CONSTANT("SUBSCR_QOS_BEST_EFFORT", DPI_SUBSCR_QOS_BEST_EFFORT)
// add constants for subscription namespaces
CXO_ADD_INT_CONSTANT("SUBSCR_NAMESPACE_AQ", DPI_SUBSCR_NAMESPACE_AQ)
CXO_ADD_INT_CONSTANT("SUBSCR_NAMESPACE_DBCHANGE",
DPI_SUBSCR_NAMESPACE_DBCHANGE)
@ -459,6 +460,7 @@ static PyObject *cxoModule_initialize(void)
CXO_ADD_INT_CONSTANT("EVENT_DEREG", DPI_EVENT_DEREG)
CXO_ADD_INT_CONSTANT("EVENT_OBJCHANGE", DPI_EVENT_OBJCHANGE)
CXO_ADD_INT_CONSTANT("EVENT_QUERYCHANGE", DPI_EVENT_QUERYCHANGE)
CXO_ADD_INT_CONSTANT("EVENT_AQ", DPI_EVENT_AQ)
// add constants for opcodes
CXO_ADD_INT_CONSTANT("OPCODE_ALLOPS", DPI_OPCODE_ALL_OPS)

View File

@ -279,6 +279,8 @@ struct cxoMessage {
PyObject *txId;
PyObject *tables;
PyObject *queries;
PyObject *queueName;
PyObject *consumerName;
};
struct cxoMessageQuery {
@ -358,6 +360,7 @@ struct cxoSubscr {
cxoConnection *connection;
PyObject *callback;
uint32_t namespace;
PyObject *name;
uint32_t protocol;
PyObject *ipAddress;
uint32_t port;
@ -433,11 +436,7 @@ cxoObjectType *cxoObjectType_new(cxoConnection *connection,
cxoObjectType *cxoObjectType_newByName(cxoConnection *connection,
PyObject *name);
cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace,
uint32_t protocol, PyObject *ipAddress, uint32_t port,
PyObject *callback, uint32_t timeout, uint32_t operations,
uint32_t qos, uint8_t groupingClass, uint32_t groupingValue,
uint8_t groupingType);
void cxoSubscr_callback(cxoSubscr *subscr, dpiSubscrMessage *message);
PyObject *cxoTransform_dateFromTicks(PyObject *args);
int cxoTransform_fromPython(cxoTransformNum transformNum, PyObject *pyValue,

View File

@ -33,6 +33,7 @@ static PyMemberDef cxoSubscrTypeMembers[] = {
{ "connection", T_OBJECT, offsetof(cxoSubscr, connection),
READONLY },
{ "namespace", T_INT, offsetof(cxoSubscr, namespace), READONLY },
{ "name", T_OBJECT, offsetof(cxoSubscr, name), READONLY },
{ "protocol", T_INT, offsetof(cxoSubscr, protocol), READONLY },
{ "ipAddress", T_OBJECT, offsetof(cxoSubscr, ipAddress), READONLY },
{ "port", T_INT, offsetof(cxoSubscr, port), READONLY },
@ -51,6 +52,8 @@ static PyMemberDef cxoMessageTypeMembers[] = {
{ "txid", T_OBJECT, offsetof(cxoMessage, txId), READONLY },
{ "tables", T_OBJECT, offsetof(cxoMessage, tables), READONLY },
{ "queries", T_OBJECT, offsetof(cxoMessage, queries), READONLY },
{ "queueName", T_OBJECT, offsetof(cxoMessage, queueName), READONLY },
{ "consumerName", T_OBJECT, offsetof(cxoMessage, consumerName), READONLY },
{ NULL }
};
@ -413,6 +416,18 @@ static int cxoMessage_initialize(cxoMessage *messageObj,
if (!messageObj->txId)
return -1;
}
if (message->queueName) {
messageObj->queueName = cxoPyString_fromEncodedString(
message->queueName, message->queueNameLength, encoding);
if (!messageObj->queueName)
return -1;
}
if (message->consumerName) {
messageObj->consumerName = cxoPyString_fromEncodedString(
message->consumerName, message->consumerNameLength, encoding);
if (!messageObj->consumerName)
return -1;
}
switch (message->eventType) {
case DPI_EVENT_OBJCHANGE:
messageObj->tables = PyList_New(message->numTables);
@ -494,8 +509,7 @@ static int cxoSubscr_callbackHandler(cxoSubscr *subscr,
// cxoSubscr_callback()
// Routine that is called when a callback needs to be invoked.
//-----------------------------------------------------------------------------
static void cxoSubscr_callback(cxoSubscr *subscr,
dpiSubscrMessage *message)
void cxoSubscr_callback(cxoSubscr *subscr, dpiSubscrMessage *message)
{
#ifdef WITH_THREAD
PyGILState_STATE gstate = PyGILState_Ensure();
@ -513,77 +527,6 @@ static void cxoSubscr_callback(cxoSubscr *subscr,
}
//-----------------------------------------------------------------------------
// cxoSubscr_new()
// Allocate a new subscription object.
//-----------------------------------------------------------------------------
cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace,
uint32_t protocol, PyObject *ipAddress, uint32_t port,
PyObject *callback, uint32_t timeout, uint32_t operations,
uint32_t qos, uint8_t groupingClass, uint32_t groupingValue,
uint8_t groupingType)
{
dpiSubscrCreateParams params;
cxoSubscr *subscr;
cxoBuffer buffer;
subscr = (cxoSubscr*) cxoPyTypeSubscr.tp_alloc(&cxoPyTypeSubscr, 0);
if (!subscr)
return NULL;
Py_INCREF(connection);
subscr->connection = connection;
Py_XINCREF(callback);
subscr->callback = callback;
subscr->namespace = namespace;
subscr->protocol = protocol;
Py_XINCREF(ipAddress);
subscr->ipAddress = ipAddress;
subscr->port = port;
subscr->timeout = timeout;
subscr->operations = operations;
subscr->qos = qos;
subscr->groupingClass = groupingClass;
subscr->groupingValue = groupingValue;
subscr->groupingType = groupingType;
if (dpiContext_initSubscrCreateParams(cxoDpiContext, &params) < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(subscr);
return NULL;
}
params.subscrNamespace = namespace;
params.protocol = protocol;
if (ipAddress) {
if (cxoBuffer_fromObject(&buffer, ipAddress,
connection->encodingInfo.encoding) < 0) {
Py_DECREF(subscr);
return NULL;
}
params.ipAddress = buffer.ptr;
params.ipAddressLength = buffer.size;
}
params.portNumber = port;
if (callback) {
params.callback = (dpiSubscrCallback) cxoSubscr_callback;
params.callbackContext = subscr;
}
params.timeout = timeout;
params.operations = operations;
params.qos = qos;
params.groupingClass = groupingClass;
params.groupingValue = groupingValue;
params.groupingType = groupingType;
if (dpiConn_newSubscription(connection->handle, &params, &subscr->handle,
&subscr->id) < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(subscr);
return NULL;
}
return subscr;
}
//-----------------------------------------------------------------------------
// cxoSubscr_free()
// Free the memory associated with a subscription.
@ -719,6 +662,8 @@ static void cxoMessage_free(cxoMessage *message)
Py_CLEAR(message->dbname);
Py_CLEAR(message->tables);
Py_CLEAR(message->queries);
Py_CLEAR(message->queueName);
Py_CLEAR(message->consumerName);
Py_TYPE(message)->tp_free((PyObject*) message);
}