From d4498cf9e0c89219170ae6b30d8071dd965ad8fc Mon Sep 17 00:00:00 2001 From: Anthony Tuininga Date: Fri, 3 May 2019 13:21:39 -0600 Subject: [PATCH] Added support for Advanced Queueing RAW queues and bulk enqueue/dequeue. --- doc/src/aq.rst | 78 ++++++ doc/src/connection.rst | 37 +++ odpi | 2 +- samples/AdvancedQueuing.py | 64 ----- samples/BulkAQ.py | 77 ++++++ samples/ObjectAQ.py | 68 +++++ samples/RawAQ.py | 60 +++++ samples/sql/SetupSamplesExec.sql | 6 + src/cxoConnection.c | 148 ++++++++++- src/cxoDeqOptions.c | 14 +- src/cxoEnqOptions.c | 24 +- src/cxoModule.c | 1 + src/cxoModule.h | 23 +- src/cxoMsgProps.c | 29 ++- src/cxoQueue.c | 429 +++++++++++++++++++++++++++++++ 15 files changed, 967 insertions(+), 93 deletions(-) delete mode 100644 samples/AdvancedQueuing.py create mode 100644 samples/BulkAQ.py create mode 100644 samples/ObjectAQ.py create mode 100644 samples/RawAQ.py create mode 100644 src/cxoQueue.c diff --git a/doc/src/aq.rst b/doc/src/aq.rst index f4ee9f7..4770f6c 100644 --- a/doc/src/aq.rst +++ b/doc/src/aq.rst @@ -4,6 +4,76 @@ Advanced Queuing **************** +.. note:: + + All of these objects are extensions to the DB API. + +.. _queue: + +------ +Queues +------ + +Queues are created using the :meth:`Connection.queue()` method and are used to +enqueue and dequeue messages. + +.. attribute:: Queue.connection + + This read-only attribute returns a reference to the connection object on + which the queue was created. + + +.. method:: Queue.deqMany(maxMessages) + + Dequeues up to the specified number of messages from the queue and returns + a list of these messages. Each element of the returned list is a + :ref:`message property` object. + + +.. method:: Queue.deqOne() + + Dequeues at most one message from the queue. If a message is dequeued, it + will be a :ref:`message property` object; otherwise, it will + be the value None. + +.. attribute:: Queue.deqOptions + + This read-only attribute returns a reference to the :ref:`options + ` that will be used when dequeuing messages from the queue. + + +.. method:: Queue.enqOne(message) + + Enqueues a single message into the queue. The message must be a + :ref:`message property` object which has had its payload + attribute set to a value that the queue supports. + + +.. method:: Queue.enqMany(messages) + + Enqueues multiple messages into the queue. The messages parameter must be a + sequence containing :ref:`message property ` objects which + have all had their payload attribute set to a value that the queue + supports. + + +.. attribute:: Queue.enqOptions + + This read-only attribute returns a reference to the :ref:`options + ` that will be used when enqueuing messages into the queue. + + +.. attribute:: Queue.name + + This read-only attribute returns the name of the queue. + + +.. attribute:: Queue.payloadType + + This read-only attribute returns the object type for payloads that can be + enqueued and dequeued. If using a raw queue, this returns the value None. + + .. _deqoptions: --------------- @@ -205,6 +275,14 @@ Message Properties generated this message. +.. attribute:: MessageProperties.payload + + This attribute identifies the payload that will be enqueued or the payload + that was dequeued when using a :ref:`queue `. When enqueuing, the + value is checked to ensure that it conforms to the type expected by that + queue. + + .. attribute:: MessageProperties.priority This attribute specifies the priority of the message. A smaller number diff --git a/doc/src/connection.rst b/doc/src/connection.rst index 82298ce..56e2c7c 100644 --- a/doc/src/connection.rst +++ b/doc/src/connection.rst @@ -193,6 +193,11 @@ Connection Object .. versionadded:: 5.3 + .. deprecated:: 7.2 + + Use the methods :meth:`Queue.deqOne()` or :meth:`Queue.deqMany()` + instead. + .. note:: This method is an extension to the DB API definition. @@ -205,6 +210,10 @@ Connection Object .. versionadded:: 5.3 + .. deprecated:: 7.2 + + Use the attribute :attr:`Queue.deqOptions` instead. + .. note:: This method is an extension to the DB API definition. @@ -253,6 +262,11 @@ Connection Object .. versionadded:: 5.3 + .. deprecated:: 7.2 + + Use the methods :meth:`Queue.enqOne()` or :meth:`Queue.enqMany()` + instead. + .. note:: This method is an extension to the DB API definition. @@ -265,6 +279,10 @@ Connection Object .. versionadded:: 5.3 + .. deprecated:: 7.2 + + Use the attribute :attr:`Queue.enqOptions` instead. + .. note:: This method is an extension to the DB API definition. @@ -447,6 +465,25 @@ Connection Object This method is an extension to the DB API definition. +.. method:: Connection.queue(name, payloadType=None) + + Creates a :ref:`queue ` which is used to enqueue and dequeue + messages in Advanced Queueing. + + The name parameter is expected to be a string identifying the queue in + which messages are to be enqueued or dequeued. + + The payloadType parameter, if specified, is expected to be an + :ref:`object type ` that identifies the type of payload the + queue expects. If not specified, RAW data is enqueued and dequeued. + + .. versionadded:: 7.2 + + .. note:: + + This method is an extension to the DB API definition. + + .. method:: Connection.rollback() Rollback any pending transactions. diff --git a/odpi b/odpi index 0fe226c..c059aa8 160000 --- a/odpi +++ b/odpi @@ -1 +1 @@ -Subproject commit 0fe226c8b5b15b0cc42ab8e638f4bb875c78d479 +Subproject commit c059aa8e878ed5aecb71359077a79d5beeb60da5 diff --git a/samples/AdvancedQueuing.py b/samples/AdvancedQueuing.py deleted file mode 100644 index 91dddcf..0000000 --- a/samples/AdvancedQueuing.py +++ /dev/null @@ -1,64 +0,0 @@ -#------------------------------------------------------------------------------ -# Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved. -# -# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. -# -# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta, -# Canada. All rights reserved. -#------------------------------------------------------------------------------ - -#------------------------------------------------------------------------------ -# AdvancedQueuing.py -# This script demonstrates how to use advanced queuing using cx_Oracle. It -# makes use of a simple type and queue created in the sample setup. -# -# This script requires cx_Oracle 5.3 and higher. -#------------------------------------------------------------------------------ - -from __future__ import print_function - -BOOK_TYPE_NAME = "UDT_BOOK" -QUEUE_NAME = "BOOKS" -QUEUE_TABLE_NAME = "BOOK_QUEUE" - -import cx_Oracle -import SampleEnv -import decimal - -# connect to database -connection = cx_Oracle.connect(SampleEnv.GetMainConnectString()) -cursor = connection.cursor() - -# dequeue all existing messages to ensure the queue is empty, just so that -# the results are consistent -booksType = connection.gettype(BOOK_TYPE_NAME) -book = booksType.newobject() -options = connection.deqoptions() -options.wait = cx_Oracle.DEQ_NO_WAIT -messageProperties = connection.msgproperties() -while connection.deq(QUEUE_NAME, options, messageProperties, book): - pass - -# enqueue a few messages -book1 = booksType.newobject() -book1.TITLE = "The Fellowship of the Ring" -book1.AUTHORS = "Tolkien, J.R.R." -book1.PRICE = decimal.Decimal("10.99") -book2 = booksType.newobject() -book2.TITLE = "Harry Potter and the Philosopher's Stone" -book2.AUTHORS = "Rowling, J.K." -book2.PRICE = decimal.Decimal("7.99") -options = connection.enqoptions() -for book in (book1, book2): - print("Enqueuing book", book.TITLE) - connection.enq(QUEUE_NAME, options, messageProperties, book) -connection.commit() - -# dequeue the messages -options = connection.deqoptions() -options.navigation = cx_Oracle.DEQ_FIRST_MSG -options.wait = cx_Oracle.DEQ_NO_WAIT -while connection.deq(QUEUE_NAME, options, messageProperties, book): - print("Dequeued book", book.TITLE) -connection.commit() - diff --git a/samples/BulkAQ.py b/samples/BulkAQ.py new file mode 100644 index 0000000..bb86c79 --- /dev/null +++ b/samples/BulkAQ.py @@ -0,0 +1,77 @@ +#------------------------------------------------------------------------------ +# Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. +# +# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. +# +# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta, +# Canada. All rights reserved. +#------------------------------------------------------------------------------ + +#------------------------------------------------------------------------------ +# BulkAQ.py +# This script demonstrates how to use bulk enqueuing and dequeuing of +# messages with advanced queuing using cx_Oracle. It makes use of a RAW queue +# created in the sample setup. +# +# This script requires cx_Oracle 7.2 and higher. +#------------------------------------------------------------------------------ + +from __future__ import print_function + +import cx_Oracle +import SampleEnv + +QUEUE_NAME = "DEMORAW" +PAYLOAD_DATA = [ + "The first message", + "The second message", + "The third message", + "The fourth message", + "The fifth message", + "The sixth message", + "The seventh message", + "The eighth message", + "The ninth message", + "The tenth message", + "The eleventh message", + "The twelfth and final message" +] + +# connect to database +connection = cx_Oracle.connect(SampleEnv.GetMainConnectString()) +cursor = connection.cursor() + +# create queue +queue = connection.queue(QUEUE_NAME) +queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT +queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG + +# dequeue all existing messages to ensure the queue is empty, just so that +# the results are consistent +while queue.deqOne(): + pass + +# enqueue a few messages +print("Enqueuing messages...") +batchSize = 6 +dataToEnq = PAYLOAD_DATA +while dataToEnq: + batchData = dataToEnq[:batchSize] + dataToEnq = dataToEnq[batchSize:] + messages = [connection.msgproperties(payload=d) for d in batchData] + for data in batchData: + print(data) + queue.enqMany(messages) +connection.commit() + +# dequeue the messages +print("\nDequeuing messages...") +batchSize = 8 +while True: + messages = queue.deqMany(batchSize) + if not messages: + break + for props in messages: + print(props.payload.decode()) +connection.commit() +print("\nDone.") diff --git a/samples/ObjectAQ.py b/samples/ObjectAQ.py new file mode 100644 index 0000000..7721971 --- /dev/null +++ b/samples/ObjectAQ.py @@ -0,0 +1,68 @@ +#------------------------------------------------------------------------------ +# Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved. +# +# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. +# +# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta, +# Canada. All rights reserved. +#------------------------------------------------------------------------------ + +#------------------------------------------------------------------------------ +# ObjectAQ.py +# This script demonstrates how to use advanced queuing with objects using +# cx_Oracle. It makes use of a simple type and queue created in the sample +# setup. +# +# This script requires cx_Oracle 7.2 and higher. +#------------------------------------------------------------------------------ + +from __future__ import print_function + +import cx_Oracle +import SampleEnv +import decimal + +BOOK_TYPE_NAME = "UDT_BOOK" +QUEUE_NAME = "BOOKS" +BOOK_DATA = [ + ("The Fellowship of the Ring", "Tolkien, J.R.R.", + decimal.Decimal("10.99")), + ("Harry Potter and the Philosopher's Stone", "Rowling, J.K.", + decimal.Decimal("7.99")) +] + +# connect to database +connection = cx_Oracle.connect(SampleEnv.GetMainConnectString()) +cursor = connection.cursor() + +# create queue +booksType = connection.gettype(BOOK_TYPE_NAME) +queue = connection.queue(QUEUE_NAME, booksType) +queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT +queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG + +# dequeue all existing messages to ensure the queue is empty, just so that +# the results are consistent +while queue.deqOne(): + pass + +# enqueue a few messages +print("Enqueuing messages...") +for title, authors, price in BOOK_DATA: + book = booksType.newobject() + book.TITLE = title + book.AUTHORS = authors + book.PRICE = price + print(title) + queue.enqOne(connection.msgproperties(payload=book)) +connection.commit() + +# dequeue the messages +print("\nDequeuing messages...") +while True: + props = queue.deqOne() + if not props: + break + print(props.payload.TITLE) +connection.commit() +print("\nDone.") diff --git a/samples/RawAQ.py b/samples/RawAQ.py new file mode 100644 index 0000000..e48131d --- /dev/null +++ b/samples/RawAQ.py @@ -0,0 +1,60 @@ +#------------------------------------------------------------------------------ +# Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. +# +# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. +# +# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta, +# Canada. All rights reserved. +#------------------------------------------------------------------------------ + +#------------------------------------------------------------------------------ +# RawAQ.py +# This script demonstrates how to use advanced queuing with RAW data using +# cx_Oracle. It makes use of a RAW queue created in the sample setup. +# +# This script requires cx_Oracle 7.2 and higher. +#------------------------------------------------------------------------------ + +from __future__ import print_function + +import cx_Oracle +import SampleEnv + +QUEUE_NAME = "DEMORAW" +PAYLOAD_DATA = [ + "The first message", + "The second message", + "The third message", + "The fourth and final message" +] + +# connect to database +connection = cx_Oracle.connect(SampleEnv.GetMainConnectString()) +cursor = connection.cursor() + +# create queue +queue = connection.queue(QUEUE_NAME) +queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT +queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG + +# dequeue all existing messages to ensure the queue is empty, just so that +# the results are consistent +while queue.deqOne(): + pass + +# enqueue a few messages +print("Enqueuing messages...") +for data in PAYLOAD_DATA: + print(data) + queue.enqOne(connection.msgproperties(payload=data)) +connection.commit() + +# dequeue the messages +print("\nDequeuing messages...") +while True: + props = queue.deqOne() + if not props: + break + print(props.payload.decode()) +connection.commit() +print("\nDone.") diff --git a/samples/sql/SetupSamplesExec.sql b/samples/sql/SetupSamplesExec.sql index 4a57116..a9d52c3 100644 --- a/samples/sql/SetupSamplesExec.sql +++ b/samples/sql/SetupSamplesExec.sql @@ -192,10 +192,16 @@ create table &main_user..PlsqlSessionCallbacks ( -- create queue table and queues for demonstrating advanced queuing begin + dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE', '&main_user..UDT_BOOK'); dbms_aqadm.create_queue('&main_user..BOOKS', '&main_user..BOOK_QUEUE'); dbms_aqadm.start_queue('&main_user..BOOKS'); + + dbms_aqadm.create_queue_table('&main_user..RAW_QUEUE', 'RAW'); + dbms_aqadm.create_queue('&main_user..DEMORAW', '&main_user..RAW_QUEUE'); + dbms_aqadm.start_queue('&main_user..DEMORAW'); + end; / diff --git a/src/cxoConnection.c b/src/cxoConnection.c index 78d115e..3c07789 100644 --- a/src/cxoConnection.c +++ b/src/cxoConnection.c @@ -41,10 +41,12 @@ static PyObject *cxoConnection_createLob(cxoConnection*, PyObject*); static PyObject *cxoConnection_getStmtCacheSize(cxoConnection*, void*); static PyObject *cxoConnection_newEnqueueOptions(cxoConnection*, PyObject*); static PyObject *cxoConnection_newDequeueOptions(cxoConnection*, PyObject*); -static PyObject *cxoConnection_newMessageProperties(cxoConnection*, PyObject*); +static PyObject *cxoConnection_newMessageProperties(cxoConnection*, PyObject*, + PyObject*); static PyObject *cxoConnection_dequeue(cxoConnection*, PyObject*, PyObject*); static PyObject *cxoConnection_enqueue(cxoConnection*, PyObject*, PyObject*); static PyObject *cxoConnection_ping(cxoConnection*, PyObject*); +static PyObject *cxoConnection_queue(cxoConnection*, PyObject*, PyObject*); static PyObject *cxoConnection_shutdown(cxoConnection*, PyObject*, PyObject*); static PyObject *cxoConnection_startup(cxoConnection*, PyObject*, PyObject*); static PyObject *cxoConnection_subscribe(cxoConnection*, PyObject*, PyObject*); @@ -103,11 +105,13 @@ static PyMethodDef cxoConnectionMethods[] = { { "enqoptions", (PyCFunction) cxoConnection_newEnqueueOptions, METH_NOARGS }, { "msgproperties", (PyCFunction) cxoConnection_newMessageProperties, - METH_NOARGS }, + METH_VARARGS | METH_KEYWORDS }, { "deq", (PyCFunction) cxoConnection_dequeue, METH_VARARGS | METH_KEYWORDS }, { "enq", (PyCFunction) cxoConnection_enqueue, METH_VARARGS | METH_KEYWORDS }, + { "queue", (PyCFunction) cxoConnection_queue, + METH_VARARGS | METH_KEYWORDS }, { "createlob", (PyCFunction) cxoConnection_createLob, METH_O }, { "getSodaDatabase", (PyCFunction) cxoConnection_getSodaDatabase, METH_NOARGS }, @@ -1321,7 +1325,7 @@ static PyObject *cxoConnection_cancel(cxoConnection *conn, PyObject *args) static PyObject *cxoConnection_newEnqueueOptions(cxoConnection *conn, PyObject *args) { - return (PyObject*) cxoEnqOptions_new(conn); + return (PyObject*) cxoEnqOptions_new(conn, NULL); } @@ -1332,7 +1336,7 @@ static PyObject *cxoConnection_newEnqueueOptions(cxoConnection *conn, static PyObject *cxoConnection_newDequeueOptions(cxoConnection *conn, PyObject *args) { - return (PyObject*) cxoDeqOptions_new(conn); + return (PyObject*) cxoDeqOptions_new(conn, NULL); } @@ -1341,9 +1345,98 @@ static PyObject *cxoConnection_newDequeueOptions(cxoConnection *conn, // Creates a new message properties object and returns it. //----------------------------------------------------------------------------- static PyObject *cxoConnection_newMessageProperties(cxoConnection *conn, - PyObject *args) + PyObject *args, PyObject *keywordArgs) { - return (PyObject*) cxoMsgProps_new(conn); + static char *keywordList[] = { "payload", "correlation", "delay", + "exceptionQ", "expiration", "priority", NULL }; + PyObject *payloadObj, *correlationObj, *exceptionQObj; + int delay, expiration, priority, status; + cxoMsgProps *props; + cxoBuffer buffer; + + // parse arguments + expiration = -1; + delay = priority = 0; + payloadObj = correlationObj = exceptionQObj = NULL; + if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|OOiOii", keywordList, + &payloadObj, &correlationObj, &delay, &exceptionQObj, &expiration, + &priority)) + return NULL; + + // create new message properties object + props = cxoMsgProps_new(conn, NULL); + if (!props) + return NULL; + + // set payload, if applicable + if (payloadObj) { + Py_INCREF(payloadObj); + props->payload = payloadObj; + } + + // set correlation, if applicable + if (correlationObj) { + if (cxoBuffer_fromObject(&buffer, correlationObj, + props->encoding) < 0) { + Py_DECREF(props); + return NULL; + } + status = dpiMsgProps_setCorrelation(props->handle, buffer.ptr, + buffer.size); + cxoBuffer_clear(&buffer); + if (status < 0) { + cxoError_raiseAndReturnNull(); + Py_DECREF(props); + return NULL; + } + } + + // set delay, if applicable + if (delay != 0) { + if (dpiMsgProps_setDelay(props->handle, (int32_t) delay) < 0) { + cxoError_raiseAndReturnNull(); + Py_DECREF(props); + return NULL; + } + } + + // set exception queue, if applicable + if (exceptionQObj) { + if (cxoBuffer_fromObject(&buffer, exceptionQObj, + props->encoding) < 0) { + Py_DECREF(props); + return NULL; + } + status = dpiMsgProps_setExceptionQ(props->handle, buffer.ptr, + buffer.size); + cxoBuffer_clear(&buffer); + if (status < 0) { + cxoError_raiseAndReturnNull(); + Py_DECREF(props); + return NULL; + } + } + + // set expiration, if applicable + if (expiration != -1) { + if (dpiMsgProps_setExpiration(props->handle, + (int32_t) expiration) < 0) { + cxoError_raiseAndReturnNull(); + Py_DECREF(props); + return NULL; + } + } + + // set priority, if applicable + if (priority != 0) { + if (dpiMsgProps_setPriority(props->handle, (int32_t) priority) < 0) { + cxoError_raiseAndReturnNull(); + Py_DECREF(props); + return NULL; + } + } + + return (PyObject*) props; } @@ -1431,6 +1524,49 @@ static PyObject *cxoConnection_enqueue(cxoConnection *conn, PyObject* args, } +//----------------------------------------------------------------------------- +// cxoConnection_queue() +// Creates a new queue associated with the connection and returns it to the +// caller. +//----------------------------------------------------------------------------- +static PyObject *cxoConnection_queue(cxoConnection *conn, PyObject* args, + PyObject* keywordArgs) +{ + static char *keywordList[] = { "name", "type", NULL }; + cxoObjectType *typeObj; + cxoBuffer nameBuffer; + PyObject *nameObj; + dpiQueue *handle; + cxoQueue *queue; + int status; + + // parse arguments + typeObj = NULL; + if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "O|O!", keywordList, + &nameObj, &cxoPyTypeObjectType, &typeObj)) + return NULL; + if (cxoBuffer_fromObject(&nameBuffer, nameObj, + conn->encodingInfo.encoding) < 0) + return NULL; + + // create queue + status = dpiConn_newQueue(conn->handle, nameBuffer.ptr, nameBuffer.size, + (typeObj) ? typeObj->handle : NULL, &handle); + cxoBuffer_clear(&nameBuffer); + if (status < 0) + return cxoError_raiseAndReturnNull(); + queue = cxoQueue_new(conn, handle); + if (!queue) + return NULL; + Py_INCREF(nameObj); + queue->name = nameObj; + Py_XINCREF(typeObj); + queue->payloadType = typeObj; + + return (PyObject*) queue; +} + + //----------------------------------------------------------------------------- // cxoConnection_contextManagerEnter() // Called when the connection is used as a context manager and simply returns diff --git a/src/cxoDeqOptions.c b/src/cxoDeqOptions.c index 4eadb5b..dd3e195 100644 --- a/src/cxoDeqOptions.c +++ b/src/cxoDeqOptions.c @@ -118,19 +118,27 @@ PyTypeObject cxoPyTypeDeqOptions = { // cxoDeqOptions_new() // Create a new dequeue options object. //----------------------------------------------------------------------------- -cxoDeqOptions *cxoDeqOptions_new(cxoConnection *connection) +cxoDeqOptions *cxoDeqOptions_new(cxoConnection *connection, + dpiDeqOptions *handle) { cxoDeqOptions *options; + int status; options = (cxoDeqOptions*) cxoPyTypeDeqOptions.tp_alloc(&cxoPyTypeDeqOptions, 0); if (!options) return NULL; - if (dpiConn_newDeqOptions(connection->handle, &options->handle) < 0) { - Py_DECREF(options); + if (handle) { + status = dpiDeqOptions_addRef(handle); + } else { + status = dpiConn_newDeqOptions(connection->handle, &handle); + } + if (status < 0) { cxoError_raiseAndReturnNull(); + Py_DECREF(options); return NULL; } + options->handle = handle; options->encoding = connection->encodingInfo.encoding; return options; diff --git a/src/cxoEnqOptions.c b/src/cxoEnqOptions.c index 237415a..39ccb7e 100644 --- a/src/cxoEnqOptions.c +++ b/src/cxoEnqOptions.c @@ -90,22 +90,30 @@ PyTypeObject cxoPyTypeEnqOptions = { // cxoEnqOptions_new() // Create a new enqueue options object. //----------------------------------------------------------------------------- -cxoEnqOptions *cxoEnqOptions_new(cxoConnection *connection) +cxoEnqOptions *cxoEnqOptions_new(cxoConnection *connection, + dpiEnqOptions *handle) { - cxoEnqOptions *self; + cxoEnqOptions *options; + int status; - self = (cxoEnqOptions*) + options = (cxoEnqOptions*) cxoPyTypeEnqOptions.tp_alloc(&cxoPyTypeEnqOptions, 0); - if (!self) + if (!options) return NULL; - if (dpiConn_newEnqOptions(connection->handle, &self->handle) < 0) { - Py_DECREF(self); + if (handle) { + status = dpiEnqOptions_addRef(handle); + } else { + status = dpiConn_newEnqOptions(connection->handle, &handle); + } + if (status < 0) { cxoError_raiseAndReturnNull(); + Py_DECREF(options); return NULL; } - self->encoding = connection->encodingInfo.encoding; + options->handle = handle; + options->encoding = connection->encodingInfo.encoding; - return self; + return options; } diff --git a/src/cxoModule.c b/src/cxoModule.c index f867c24..d9a9a99 100644 --- a/src/cxoModule.c +++ b/src/cxoModule.c @@ -277,6 +277,7 @@ static PyObject *cxoModule_initialize(void) CXO_MAKE_TYPE_READY(&cxoPyTypeObject); CXO_MAKE_TYPE_READY(&cxoPyTypeObjectType); CXO_MAKE_TYPE_READY(&cxoPyTypeObjectVar); + CXO_MAKE_TYPE_READY(&cxoPyTypeQueue); CXO_MAKE_TYPE_READY(&cxoPyTypeRowidVar); CXO_MAKE_TYPE_READY(&cxoPyTypeSessionPool); CXO_MAKE_TYPE_READY(&cxoPyTypeSodaCollection); diff --git a/src/cxoModule.h b/src/cxoModule.h index 6de88ec..e2043dc 100644 --- a/src/cxoModule.h +++ b/src/cxoModule.h @@ -79,6 +79,7 @@ typedef struct cxoMsgProps cxoMsgProps; typedef struct cxoObject cxoObject; typedef struct cxoObjectAttr cxoObjectAttr; typedef struct cxoObjectType cxoObjectType; +typedef struct cxoQueue cxoQueue; typedef struct cxoSessionPool cxoSessionPool; typedef struct cxoSodaCollection cxoSodaCollection; typedef struct cxoSodaDatabase cxoSodaDatabase; @@ -140,6 +141,7 @@ extern PyTypeObject cxoPyTypeObject; extern PyTypeObject cxoPyTypeObjectAttr; extern PyTypeObject cxoPyTypeObjectType; extern PyTypeObject cxoPyTypeObjectVar; +extern PyTypeObject cxoPyTypeQueue; extern PyTypeObject cxoPyTypeRowidVar; extern PyTypeObject cxoPyTypeSessionPool; extern PyTypeObject cxoPyTypeSodaCollection; @@ -319,6 +321,7 @@ struct cxoMessageTable { struct cxoMsgProps { PyObject_HEAD dpiMsgProps *handle; + PyObject *payload; const char *encoding; }; @@ -351,6 +354,16 @@ struct cxoObjectType { char isCollection; }; +struct cxoQueue { + PyObject_HEAD + cxoConnection *conn; + dpiQueue *handle; + PyObject *name; + PyObject *deqOptions; + PyObject *enqOptions; + cxoObjectType *payloadType; +}; + struct cxoSessionPool { PyObject_HEAD dpiPool *handle; @@ -462,9 +475,11 @@ int cxoCursor_performBind(cxoCursor *cursor); int cxoCursor_setBindVariables(cxoCursor *cursor, PyObject *parameters, unsigned numElements, unsigned arrayPos, int deferTypeAssignment); -cxoDeqOptions *cxoDeqOptions_new(cxoConnection *connection); +cxoDeqOptions *cxoDeqOptions_new(cxoConnection *connection, + dpiDeqOptions *handle); -cxoEnqOptions *cxoEnqOptions_new(cxoConnection *connection); +cxoEnqOptions *cxoEnqOptions_new(cxoConnection *connection, + dpiEnqOptions *handle); cxoError *cxoError_newFromInfo(dpiErrorInfo *errorInfo); int cxoError_raiseAndReturnInt(void); @@ -476,7 +491,7 @@ PyObject *cxoError_raiseFromString(PyObject *exceptionType, PyObject *cxoLob_new(cxoConnection *connection, dpiOracleTypeNum oracleTypeNum, dpiLob *handle); -cxoMsgProps *cxoMsgProps_new(cxoConnection*); +cxoMsgProps *cxoMsgProps_new(cxoConnection*, dpiMsgProps *handle); int cxoObject_internalExtend(cxoObject *obj, PyObject *sequence); PyObject *cxoObject_new(cxoObjectType *objectType, dpiObject *handle); @@ -489,6 +504,8 @@ cxoObjectType *cxoObjectType_new(cxoConnection *connection, cxoObjectType *cxoObjectType_newByName(cxoConnection *connection, PyObject *name); +cxoQueue *cxoQueue_new(cxoConnection *conn, dpiQueue *handle); + cxoSodaCollection *cxoSodaCollection_new(cxoSodaDatabase *db, dpiSodaColl *handle); diff --git a/src/cxoMsgProps.c b/src/cxoMsgProps.c index 97991a7..5c0a481 100644 --- a/src/cxoMsgProps.c +++ b/src/cxoMsgProps.c @@ -15,7 +15,7 @@ #include "cxoModule.h" //----------------------------------------------------------------------------- -// Declaration of methods used for message properties +// forward declarations //----------------------------------------------------------------------------- static void cxoMsgProps_free(cxoMsgProps*); static PyObject *cxoMsgProps_getNumAttempts(cxoMsgProps*, void*); @@ -37,9 +37,18 @@ static int cxoMsgProps_setPriority(cxoMsgProps*, PyObject*, void*); //----------------------------------------------------------------------------- -// declaration of calculated members for Python type "MessageProperties" +// declaration of members //----------------------------------------------------------------------------- -static PyGetSetDef cxoMsgPropsCalcMembers[] = { +static PyMemberDef cxoMembers[] = { + { "payload", T_OBJECT, offsetof(cxoMsgProps, payload), 0 }, + { NULL } +}; + + +//----------------------------------------------------------------------------- +// declaration of calculated members +//----------------------------------------------------------------------------- +static PyGetSetDef cxoCalcMembers[] = { { "attempts", (getter) cxoMsgProps_getNumAttempts, 0, 0, 0 }, { "correlation", (getter) cxoMsgProps_getCorrelation, (setter) cxoMsgProps_setCorrelation, 0, 0 }, @@ -92,8 +101,8 @@ PyTypeObject cxoPyTypeMsgProps = { 0, // tp_iter 0, // tp_iternext 0, // tp_methods - 0, // tp_members - cxoMsgPropsCalcMembers, // tp_getset + cxoMembers, // tp_members + cxoCalcMembers, // tp_getset 0, // tp_base 0, // tp_dict 0, // tp_descr_get @@ -112,18 +121,22 @@ PyTypeObject cxoPyTypeMsgProps = { // cxoMsgProps_new() // Create a new message properties object. //----------------------------------------------------------------------------- -cxoMsgProps *cxoMsgProps_new(cxoConnection *connection) +cxoMsgProps *cxoMsgProps_new(cxoConnection *connection, dpiMsgProps *handle) { cxoMsgProps *props; props = (cxoMsgProps*) cxoPyTypeMsgProps.tp_alloc(&cxoPyTypeMsgProps, 0); - if (!props) + if (!props) { + if (handle) + dpiMsgProps_release(handle); return NULL; - if (dpiConn_newMsgProps(connection->handle, &props->handle) < 0) { + } + if (!handle && dpiConn_newMsgProps(connection->handle, &handle) < 0) { Py_DECREF(props); cxoError_raiseAndReturnNull(); return NULL; } + props->handle = handle; props->encoding = connection->encodingInfo.encoding; return props; diff --git a/src/cxoQueue.c b/src/cxoQueue.c new file mode 100644 index 0000000..6b7acd9 --- /dev/null +++ b/src/cxoQueue.c @@ -0,0 +1,429 @@ +//----------------------------------------------------------------------------- +// Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. +//----------------------------------------------------------------------------- + +//----------------------------------------------------------------------------- +// cxoQueue.c +// Defines the routines for handling queues (advanced queuing). These queues +// permit sending and receiving messages defined by the database. +//----------------------------------------------------------------------------- + +#include "cxoModule.h" + +//----------------------------------------------------------------------------- +// Declaration of functions +//----------------------------------------------------------------------------- +static void cxoQueue_free(cxoQueue*); +static PyObject *cxoQueue_repr(cxoQueue*); +static PyObject *cxoQueue_deqMany(cxoQueue*, PyObject*); +static PyObject *cxoQueue_deqOne(cxoQueue*, PyObject*); +static PyObject *cxoQueue_enqMany(cxoQueue*, PyObject*); +static PyObject *cxoQueue_enqOne(cxoQueue*, PyObject*); + + +//----------------------------------------------------------------------------- +// declaration of methods +//----------------------------------------------------------------------------- +static PyMethodDef cxoMethods[] = { + { "deqMany", (PyCFunction) cxoQueue_deqMany, METH_VARARGS }, + { "deqOne", (PyCFunction) cxoQueue_deqOne, METH_NOARGS }, + { "enqMany", (PyCFunction) cxoQueue_enqMany, METH_VARARGS }, + { "enqOne", (PyCFunction) cxoQueue_enqOne, METH_VARARGS }, + { NULL } +}; + + +//----------------------------------------------------------------------------- +// declaration of members +//----------------------------------------------------------------------------- +static PyMemberDef cxoMembers[] = { + { "connection", T_OBJECT, offsetof(cxoQueue, conn), READONLY }, + { "deqOptions", T_OBJECT, offsetof(cxoQueue, deqOptions), READONLY }, + { "enqOptions", T_OBJECT, offsetof(cxoQueue, enqOptions), READONLY }, + { "name", T_OBJECT, offsetof(cxoQueue, name), READONLY }, + { "payloadType", T_OBJECT, offsetof(cxoQueue, payloadType), READONLY }, + { NULL } +}; + + +//----------------------------------------------------------------------------- +// Python type declarations +//----------------------------------------------------------------------------- +PyTypeObject cxoPyTypeQueue = { + PyVarObject_HEAD_INIT(NULL, 0) + "cx_Oracle.Queue", // tp_name + sizeof(cxoQueue), // tp_basicsize + 0, // tp_itemsize + (destructor) cxoQueue_free, // tp_dealloc + 0, // tp_print + 0, // tp_getattr + 0, // tp_setattr + 0, // tp_compare + (reprfunc) cxoQueue_repr, // tp_repr + 0, // tp_as_number + 0, // tp_as_sequence + 0, // tp_as_mapping + 0, // tp_hash + 0, // tp_call + 0, // tp_str + 0, // tp_getattro + 0, // tp_setattro + 0, // tp_as_buffer + Py_TPFLAGS_DEFAULT, // tp_flags + 0, // tp_doc + 0, // tp_traverse + 0, // tp_clear + 0, // tp_richcompare + 0, // tp_weaklistoffset + 0, // tp_iter + 0, // tp_iternext + cxoMethods, // tp_methods + cxoMembers, // tp_members + 0, // tp_getset + 0, // tp_base + 0, // tp_dict + 0, // tp_descr_get + 0, // tp_descr_set + 0, // tp_dictoffset + 0, // tp_init + 0, // tp_alloc + 0, // tp_new + 0, // tp_free + 0, // tp_is_gc + 0 // tp_bases +}; + + +//----------------------------------------------------------------------------- +// cxoQueue_new() +// Create a new queue (advanced queuing). +//----------------------------------------------------------------------------- +cxoQueue *cxoQueue_new(cxoConnection *conn, dpiQueue *handle) +{ + dpiDeqOptions *deqOptions; + dpiEnqOptions *enqOptions; + cxoQueue *queue; + + // create queue and populate basic attributes + queue = (cxoQueue*) cxoPyTypeQueue.tp_alloc(&cxoPyTypeQueue, 0); + if (!queue) { + dpiQueue_release(handle); + return NULL; + } + Py_INCREF(conn); + queue->conn = conn; + queue->handle = handle; + + // get dequeue options + if (dpiQueue_getDeqOptions(queue->handle, &deqOptions) < 0) { + cxoError_raiseAndReturnNull(); + Py_DECREF(queue); + return NULL; + } + queue->deqOptions = (PyObject*) cxoDeqOptions_new(conn, deqOptions); + if (!queue->deqOptions) { + Py_DECREF(queue); + return NULL; + } + + // get enqueue options + if (dpiQueue_getEnqOptions(queue->handle, &enqOptions) < 0) { + cxoError_raiseAndReturnNull(); + Py_DECREF(queue); + return NULL; + } + queue->enqOptions = (PyObject*) cxoEnqOptions_new(conn, enqOptions); + if (!queue->enqOptions) { + Py_DECREF(queue); + return NULL; + } + + return queue; +} + + +//----------------------------------------------------------------------------- +// cxoQueue_free() +// Free the memory associated with a queue. +//----------------------------------------------------------------------------- +static void cxoQueue_free(cxoQueue *queue) +{ + if (queue->handle) { + dpiQueue_release(queue->handle); + queue->handle = NULL; + } + Py_CLEAR(queue->conn); + Py_CLEAR(queue->name); + Py_CLEAR(queue->payloadType); + Py_CLEAR(queue->deqOptions); + Py_CLEAR(queue->enqOptions); + Py_TYPE(queue)->tp_free((PyObject*) queue); +} + + +//----------------------------------------------------------------------------- +// cxoQueue_repr() +// Return a string representation of a queue. +//----------------------------------------------------------------------------- +static PyObject *cxoQueue_repr(cxoQueue *queue) +{ + PyObject *module, *name, *result; + + if (cxoUtils_getModuleAndName(Py_TYPE(queue), &module, &name) < 0) + return NULL; + result = cxoUtils_formatString("<%s.%s %r>", + PyTuple_Pack(3, module, name, queue->name)); + Py_DECREF(module); + Py_DECREF(name); + return result; +} + + +//----------------------------------------------------------------------------- +// cxoQueue_deqHelper() +// Helper for dequeuing messages from a queue. +//----------------------------------------------------------------------------- +int cxoQueue_deqHelper(cxoQueue *queue, uint32_t *numProps, + cxoMsgProps **props) +{ + uint32_t bufferLength, i, j; + dpiMsgProps **handles; + dpiObject *objHandle; + const char *buffer; + cxoMsgProps *temp; + cxoObject *obj; + int ok; + + // use the same array to store the intermediate values provided by ODPI-C; + // by doing so there is no need to allocate an additional array and any + // values created by this helper routine are cleaned up on error + handles = (dpiMsgProps**) props; + + // perform dequeue + if (dpiQueue_deqMany(queue->handle, numProps, handles) < 0) + return cxoError_raiseAndReturnInt(); + + // create objects that are returned to the user + for (i = 0; i < *numProps; i++) { + + // create message property object + temp = cxoMsgProps_new(queue->conn, handles[i]); + ok = (temp) ? 1 : 0; + props[i] = temp; + + // get payload from ODPI-C message property + if (ok && dpiMsgProps_getPayload(temp->handle, &objHandle, &buffer, + &bufferLength) < 0) { + cxoError_raiseAndReturnInt(); + ok = 0; + } + + // store payload on cx_Oracle message property + if (ok && objHandle) { + obj = (cxoObject*) cxoObject_new(queue->payloadType, objHandle); + if (obj && dpiObject_addRef(objHandle) < 0) { + cxoError_raiseAndReturnInt(); + obj->handle = NULL; + Py_CLEAR(obj); + ok = 0; + } + temp->payload = (PyObject*) obj; + } else if (ok) { + temp->payload = PyBytes_FromStringAndSize(buffer, bufferLength); + } + + // if an error occurred, do some cleanup + if (!ok || !temp->payload) { + Py_XDECREF(temp); + for (j = 0; j < i; j++) + Py_DECREF(props[j]); + for (j = i + 1; j < *numProps; j++) + dpiMsgProps_release(handles[j]); + return -1; + } + + } + + return 0; +} + + +//----------------------------------------------------------------------------- +// cxoQueue_enqHelper() +// Helper for enqueuing messages from a queue. +//----------------------------------------------------------------------------- +int cxoQueue_enqHelper(cxoQueue *queue, uint32_t numProps, + cxoMsgProps **props) +{ + dpiMsgProps **handles, *tempHandle; + cxoBuffer buffer; + cxoObject *obj; + uint32_t i; + int status; + + // use the same array to store the intermediate values required by ODPI-C; + // by doing so there is no need to allocate an additional array + handles = (dpiMsgProps**) props; + + // process array + for (i = 0; i < numProps; i++) { + + // verify that the message property object has a payload + if (!props[i]->payload || props[i]->payload == Py_None) { + cxoError_raiseFromString(cxoProgrammingErrorException, + "message has no payload"); + return -1; + } + + // transfer payload to message properties object + tempHandle = props[i]->handle; + if (PyObject_IsInstance(props[i]->payload, + (PyObject*) &cxoPyTypeObject)) { + obj = (cxoObject*) props[i]->payload; + if (dpiMsgProps_setPayloadObject(props[i]->handle, + obj->handle) < 0) + return cxoError_raiseAndReturnInt(); + } else { + if (cxoBuffer_fromObject(&buffer, props[i]->payload, + props[i]->encoding) < 0) + return -1; + status = dpiMsgProps_setPayloadBytes(props[i]->handle, buffer.ptr, + buffer.size); + cxoBuffer_clear(&buffer); + if (status < 0) + return cxoError_raiseAndReturnInt(); + } + handles[i] = tempHandle; + + } + + // perform enqueue + if (dpiQueue_enqMany(queue->handle, numProps, handles) < 0) + return cxoError_raiseAndReturnInt(); + + return 0; +} + + +//----------------------------------------------------------------------------- +// cxoQueue_deqMany() +// Dequeue a single message to the queue. +//----------------------------------------------------------------------------- +static PyObject *cxoQueue_deqMany(cxoQueue *queue, PyObject *args) +{ + unsigned int numPropsFromPython; + uint32_t numProps, i; + cxoMsgProps **props; + PyObject *result; + + if (!PyArg_ParseTuple(args, "I", &numPropsFromPython)) + return NULL; + numProps = (uint32_t) numPropsFromPython; + props = PyMem_Malloc(numProps * sizeof(cxoMsgProps*)); + if (!props) + return NULL; + if (cxoQueue_deqHelper(queue, &numProps, props) < 0) { + PyMem_Free(props); + return NULL; + } + result = PyList_New(numProps); + if (!result) { + for (i = 0; i < numProps; i++) + Py_DECREF(props[i]); + PyMem_Free(props); + return NULL; + } + for (i = 0; i < numProps; i++) + PyList_SET_ITEM(result, i, (PyObject*) props[i]); + PyMem_Free(props); + return result; +} + + +//----------------------------------------------------------------------------- +// cxoQueue_deqOne() +// Dequeue a single message to the queue. +//----------------------------------------------------------------------------- +static PyObject *cxoQueue_deqOne(cxoQueue *queue, PyObject *args) +{ + uint32_t numProps = 1; + cxoMsgProps *props; + + if (cxoQueue_deqHelper(queue, &numProps, &props) < 0) + return NULL; + if (numProps > 0) + return (PyObject*) props; + Py_RETURN_NONE; +} + + +//----------------------------------------------------------------------------- +// cxoQueue_enqMany() +// Enqueue multiple messages to the queue. +//----------------------------------------------------------------------------- +static PyObject *cxoQueue_enqMany(cxoQueue *queue, PyObject *args) +{ + PyObject *seq, *seqCheck, *temp; + Py_ssize_t seqLength, i; + cxoMsgProps **props; + int status; + + // validate arguments + if (!PyArg_ParseTuple(args, "O", &seqCheck)) + return NULL; + seq = PySequence_Fast(seqCheck, "expecting sequence"); + if (!seq) + return NULL; + + // zero messages means nothing to do + seqLength = PySequence_Length(seq); + if (seqLength == 0) { + Py_DECREF(seq); + Py_RETURN_NONE; + } + + // populate array of properties + props = PyMem_Malloc(seqLength * sizeof(cxoMsgProps*)); + if (!props) { + PyErr_NoMemory(); + Py_DECREF(seq); + return NULL; + } + for (i = 0; i < seqLength; i++) { + temp = PySequence_Fast_GET_ITEM(seq, i); + if (Py_TYPE(temp) != &cxoPyTypeMsgProps) { + Py_DECREF(seq); + PyMem_Free(props); + PyErr_SetString(PyExc_TypeError, + "expecting sequence of message property objects"); + return NULL; + } + props[i] = (cxoMsgProps*) temp; + } + + // perform enqueue + status = cxoQueue_enqHelper(queue, (uint32_t) seqLength, props); + Py_DECREF(seq); + PyMem_Free(props); + if (status < 0) + return NULL; + + Py_RETURN_NONE; +} + + +//----------------------------------------------------------------------------- +// cxoQueue_enqOne() +// Enqueue a single message to the queue. +//----------------------------------------------------------------------------- +static PyObject *cxoQueue_enqOne(cxoQueue *queue, PyObject *args) +{ + cxoMsgProps *props; + + if (!PyArg_ParseTuple(args, "O!", &cxoPyTypeMsgProps, &props)) + return NULL; + if (cxoQueue_enqHelper(queue, 1, &props) < 0) + return NULL; + + Py_RETURN_NONE; +}