Added support for grouping notifications from subscriptions.

This commit is contained in:
Anthony Tuininga 2018-05-16 14:09:01 -06:00
parent bab41ac544
commit 4ec7685f95
6 changed files with 86 additions and 14 deletions

View File

@ -480,7 +480,7 @@ Connection Object
This attribute is an extension to the DB API definition.
.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None)
.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY)
Return a new :ref:`subscription object <subscrobj>` using the connection.
Currently the namespace and protocol parameters cannot have any other
@ -498,9 +498,9 @@ Connection Object
(insert, update, delete). The default value will send notifications for all
operations.
The port specifies the listening port for callback notifications from the
database server. If not specified, an unused port will be selected by the
database.
The port parameter specifies the listening port for callback notifications
from the database server. If not specified, an unused port will be selected
by the database.
The qos parameter specifies quality of service options. It should be one or
more of the following flags, OR'ed together:
@ -510,9 +510,20 @@ Connection Object
:data:`cx_Oracle.SUBSCR_QOS_QUERY`,
:data:`cx_Oracle.SUBSCR_QOS_BEST_EFFORT`.
The ipAddress specifies the IP address (IPv4 or IPv6) to bind for callback
notifications from the database server. If not specified, the client IP
address will be determined by the Oracle Client libraries.
The ipAddress parameter specifies the IP address (IPv4 or IPv6) to bind for
callback notifications from the database server. If not specified, the
client IP address will be determined by the Oracle Client libraries.
The groupingClass parameter specifies what type of grouping of
notifications should take place. Currently, if set, this value can only be
set to the value :data:`cx_Oracle.SUBSCR_GROUPING_CLASS_TIME`, which
will group notifications by the number of seconds specified in the
groupingValue parameter. The groupingType parameter should be one of the
values :data:`cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY` (the default) or
:data:`cx_Oracle.SUBSCR_GROUPING_TYPE_LAST`.
*New in version 6.4:* The parameters ipAddress, groupingClass,
groupingValue and groupingType were added.
.. note::

View File

@ -781,6 +781,39 @@ in database resident connection pooling (DRCP).
need not be new and may have prior session state.
Subscription Grouping Classes
-----------------------------
These constants are extensions to the DB API definition. They are possible
values for the groupingClass parameter of the :meth:`Connection.subscribe()`
method.
.. data:: SUBSCR_GROUPING_CLASS_TIME
This constant is used to specify that events are to be grouped by the
period of time in which they are received.
Subscription Grouping Types
---------------------------
These constants are extensions to the DB API definition. They are possible
values for the groupingType parameter of the :meth:`Connection.subscribe()`
method.
.. data:: SUBSCR_GROUPING_TYPE_SUMMARY
This constant is used to specify that when events are grouped a summary of
the events should be sent instead of the individual events. This is the
default value.
.. data:: SUBSCR_GROUPING_TYPE_LAST
This constant is used to specify that when events are grouped the last
event that makes up the group should be sent instead of the individual
events.
Subscription Namespaces
-----------------------

View File

@ -1456,21 +1456,28 @@ static PyObject *cxoConnection_subscribe(cxoConnection *conn, PyObject* args,
PyObject* keywordArgs)
{
static char *keywordList[] = { "namespace", "protocol", "callback",
"timeout", "operations", "port", "qos", "ipAddress", NULL };
"timeout", "operations", "port", "qos", "ipAddress",
"groupingClass", "groupingValue", "groupingType", NULL };
uint32_t namespace, protocol, port, timeout, operations, qos;
uint8_t groupingClass, groupingType;
PyObject *callback, *ipAddress;
uint32_t groupingValue;
timeout = port = qos = 0;
groupingClass = 0;
callback = ipAddress = NULL;
timeout = port = qos = groupingValue = 0;
groupingType = DPI_SUBSCR_GROUPING_TYPE_SUMMARY;
namespace = DPI_SUBSCR_NAMESPACE_DBCHANGE;
protocol = DPI_SUBSCR_PROTO_CALLBACK;
operations = DPI_OPCODE_ALL_OPS;
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|iiOiiiiO",
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|iiOiiiiObib",
keywordList, &namespace, &protocol, &callback, &timeout,
&operations, &port, &qos, &ipAddress))
&operations, &port, &qos, &ipAddress, &groupingClass,
&groupingValue, &groupingType))
return NULL;
return (PyObject*) cxoSubscr_new(conn, namespace, protocol, ipAddress,
port, callback, timeout, operations, qos);
port, callback, timeout, operations, qos, groupingClass,
groupingValue, groupingType);
}

View File

@ -439,6 +439,16 @@ static PyObject *cxoModule_initialize(void)
CXO_ADD_INT_CONSTANT("SUBSCR_NAMESPACE_DBCHANGE",
DPI_SUBSCR_NAMESPACE_DBCHANGE)
// add constants for subscription grouping classes
CXO_ADD_INT_CONSTANT("SUBSCR_GROUPING_CLASS_TIME",
DPI_SUBSCR_GROUPING_CLASS_TIME)
// add constants for subscription grouping types
CXO_ADD_INT_CONSTANT("SUBSCR_GROUPING_TYPE_SUMMARY",
DPI_SUBSCR_GROUPING_TYPE_SUMMARY)
CXO_ADD_INT_CONSTANT("SUBSCR_GROUPING_TYPE_LAST",
DPI_SUBSCR_GROUPING_TYPE_LAST)
// add constants for event types
CXO_ADD_INT_CONSTANT("EVENT_NONE", DPI_EVENT_NONE)
CXO_ADD_INT_CONSTANT("EVENT_STARTUP", DPI_EVENT_STARTUP)

View File

@ -364,6 +364,9 @@ struct cxoSubscr {
uint32_t timeout;
uint32_t operations;
uint32_t qos;
uint8_t groupingClass;
uint32_t groupingValue;
uint8_t groupingType;
uint64_t id;
};
@ -433,7 +436,8 @@ cxoObjectType *cxoObjectType_newByName(cxoConnection *connection,
cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace,
uint32_t protocol, PyObject *ipAddress, uint32_t port,
PyObject *callback, uint32_t timeout, uint32_t operations,
uint32_t qos);
uint32_t qos, uint8_t groupingClass, uint32_t groupingValue,
uint8_t groupingType);
PyObject *cxoTransform_dateFromTicks(PyObject *args);
int cxoTransform_fromPython(cxoTransformNum transformNum, PyObject *pyValue,

View File

@ -520,7 +520,8 @@ static void cxoSubscr_callback(cxoSubscr *subscr,
cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace,
uint32_t protocol, PyObject *ipAddress, uint32_t port,
PyObject *callback, uint32_t timeout, uint32_t operations,
uint32_t qos)
uint32_t qos, uint8_t groupingClass, uint32_t groupingValue,
uint8_t groupingType)
{
dpiSubscrCreateParams params;
cxoSubscr *subscr;
@ -541,6 +542,9 @@ cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace,
subscr->timeout = timeout;
subscr->operations = operations;
subscr->qos = qos;
subscr->groupingClass = groupingClass;
subscr->groupingValue = groupingValue;
subscr->groupingType = groupingType;
if (dpiContext_initSubscrCreateParams(cxoDpiContext, &params) < 0) {
cxoError_raiseAndReturnNull();
@ -566,6 +570,9 @@ cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace,
params.timeout = timeout;
params.operations = operations;
params.qos = qos;
params.groupingClass = groupingClass;
params.groupingValue = groupingValue;
params.groupingType = groupingType;
if (dpiConn_newSubscription(connection->handle, &params, &subscr->handle,
&subscr->id) < 0) {
cxoError_raiseAndReturnNull();