Added support for Advanced Queueing RAW queues and bulk enqueue/dequeue.

This commit is contained in:
Anthony Tuininga 2019-05-03 13:21:39 -06:00
parent 04a7dec0d4
commit d4498cf9e0
15 changed files with 967 additions and 93 deletions

View File

@ -4,6 +4,76 @@
Advanced Queuing
****************
.. note::
All of these objects are extensions to the DB API.
.. _queue:
------
Queues
------
Queues are created using the :meth:`Connection.queue()` method and are used to
enqueue and dequeue messages.
.. attribute:: Queue.connection
This read-only attribute returns a reference to the connection object on
which the queue was created.
.. method:: Queue.deqMany(maxMessages)
Dequeues up to the specified number of messages from the queue and returns
a list of these messages. Each element of the returned list is a
:ref:`message property<msgproperties>` object.
.. method:: Queue.deqOne()
Dequeues at most one message from the queue. If a message is dequeued, it
will be a :ref:`message property<msgproperties>` object; otherwise, it will
be the value None.
.. attribute:: Queue.deqOptions
This read-only attribute returns a reference to the :ref:`options
<deqoptions>` that will be used when dequeuing messages from the queue.
.. method:: Queue.enqOne(message)
Enqueues a single message into the queue. The message must be a
:ref:`message property<msgproperties>` object which has had its payload
attribute set to a value that the queue supports.
.. method:: Queue.enqMany(messages)
Enqueues multiple messages into the queue. The messages parameter must be a
sequence containing :ref:`message property <msgproperties>` objects which
have all had their payload attribute set to a value that the queue
supports.
.. attribute:: Queue.enqOptions
This read-only attribute returns a reference to the :ref:`options
<enqoptions>` that will be used when enqueuing messages into the queue.
.. attribute:: Queue.name
This read-only attribute returns the name of the queue.
.. attribute:: Queue.payloadType
This read-only attribute returns the object type for payloads that can be
enqueued and dequeued. If using a raw queue, this returns the value None.
.. _deqoptions:
---------------
@ -205,6 +275,14 @@ Message Properties
generated this message.
.. attribute:: MessageProperties.payload
This attribute identifies the payload that will be enqueued or the payload
that was dequeued when using a :ref:`queue <queue>`. When enqueuing, the
value is checked to ensure that it conforms to the type expected by that
queue.
.. attribute:: MessageProperties.priority
This attribute specifies the priority of the message. A smaller number

View File

@ -193,6 +193,11 @@ Connection Object
.. versionadded:: 5.3
.. deprecated:: 7.2
Use the methods :meth:`Queue.deqOne()` or :meth:`Queue.deqMany()`
instead.
.. note::
This method is an extension to the DB API definition.
@ -205,6 +210,10 @@ Connection Object
.. versionadded:: 5.3
.. deprecated:: 7.2
Use the attribute :attr:`Queue.deqOptions` instead.
.. note::
This method is an extension to the DB API definition.
@ -253,6 +262,11 @@ Connection Object
.. versionadded:: 5.3
.. deprecated:: 7.2
Use the methods :meth:`Queue.enqOne()` or :meth:`Queue.enqMany()`
instead.
.. note::
This method is an extension to the DB API definition.
@ -265,6 +279,10 @@ Connection Object
.. versionadded:: 5.3
.. deprecated:: 7.2
Use the attribute :attr:`Queue.enqOptions` instead.
.. note::
This method is an extension to the DB API definition.
@ -447,6 +465,25 @@ Connection Object
This method is an extension to the DB API definition.
.. method:: Connection.queue(name, payloadType=None)
Creates a :ref:`queue <queue>` which is used to enqueue and dequeue
messages in Advanced Queueing.
The name parameter is expected to be a string identifying the queue in
which messages are to be enqueued or dequeued.
The payloadType parameter, if specified, is expected to be an
:ref:`object type <objecttype>` that identifies the type of payload the
queue expects. If not specified, RAW data is enqueued and dequeued.
.. versionadded:: 7.2
.. note::
This method is an extension to the DB API definition.
.. method:: Connection.rollback()
Rollback any pending transactions.

2
odpi

@ -1 +1 @@
Subproject commit 0fe226c8b5b15b0cc42ab8e638f4bb875c78d479
Subproject commit c059aa8e878ed5aecb71359077a79d5beeb60da5

View File

@ -1,64 +0,0 @@
#------------------------------------------------------------------------------
# Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
# Canada. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# AdvancedQueuing.py
# This script demonstrates how to use advanced queuing using cx_Oracle. It
# makes use of a simple type and queue created in the sample setup.
#
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
from __future__ import print_function
BOOK_TYPE_NAME = "UDT_BOOK"
QUEUE_NAME = "BOOKS"
QUEUE_TABLE_NAME = "BOOK_QUEUE"
import cx_Oracle
import SampleEnv
import decimal
# connect to database
connection = cx_Oracle.connect(SampleEnv.GetMainConnectString())
cursor = connection.cursor()
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
booksType = connection.gettype(BOOK_TYPE_NAME)
book = booksType.newobject()
options = connection.deqoptions()
options.wait = cx_Oracle.DEQ_NO_WAIT
messageProperties = connection.msgproperties()
while connection.deq(QUEUE_NAME, options, messageProperties, book):
pass
# enqueue a few messages
book1 = booksType.newobject()
book1.TITLE = "The Fellowship of the Ring"
book1.AUTHORS = "Tolkien, J.R.R."
book1.PRICE = decimal.Decimal("10.99")
book2 = booksType.newobject()
book2.TITLE = "Harry Potter and the Philosopher's Stone"
book2.AUTHORS = "Rowling, J.K."
book2.PRICE = decimal.Decimal("7.99")
options = connection.enqoptions()
for book in (book1, book2):
print("Enqueuing book", book.TITLE)
connection.enq(QUEUE_NAME, options, messageProperties, book)
connection.commit()
# dequeue the messages
options = connection.deqoptions()
options.navigation = cx_Oracle.DEQ_FIRST_MSG
options.wait = cx_Oracle.DEQ_NO_WAIT
while connection.deq(QUEUE_NAME, options, messageProperties, book):
print("Dequeued book", book.TITLE)
connection.commit()

77
samples/BulkAQ.py Normal file
View File

@ -0,0 +1,77 @@
#------------------------------------------------------------------------------
# Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
# Canada. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# BulkAQ.py
# This script demonstrates how to use bulk enqueuing and dequeuing of
# messages with advanced queuing using cx_Oracle. It makes use of a RAW queue
# created in the sample setup.
#
# This script requires cx_Oracle 7.2 and higher.
#------------------------------------------------------------------------------
from __future__ import print_function
import cx_Oracle
import SampleEnv
QUEUE_NAME = "DEMORAW"
PAYLOAD_DATA = [
"The first message",
"The second message",
"The third message",
"The fourth message",
"The fifth message",
"The sixth message",
"The seventh message",
"The eighth message",
"The ninth message",
"The tenth message",
"The eleventh message",
"The twelfth and final message"
]
# connect to database
connection = cx_Oracle.connect(SampleEnv.GetMainConnectString())
cursor = connection.cursor()
# create queue
queue = connection.queue(QUEUE_NAME)
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
while queue.deqOne():
pass
# enqueue a few messages
print("Enqueuing messages...")
batchSize = 6
dataToEnq = PAYLOAD_DATA
while dataToEnq:
batchData = dataToEnq[:batchSize]
dataToEnq = dataToEnq[batchSize:]
messages = [connection.msgproperties(payload=d) for d in batchData]
for data in batchData:
print(data)
queue.enqMany(messages)
connection.commit()
# dequeue the messages
print("\nDequeuing messages...")
batchSize = 8
while True:
messages = queue.deqMany(batchSize)
if not messages:
break
for props in messages:
print(props.payload.decode())
connection.commit()
print("\nDone.")

68
samples/ObjectAQ.py Normal file
View File

@ -0,0 +1,68 @@
#------------------------------------------------------------------------------
# Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
# Canada. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# ObjectAQ.py
# This script demonstrates how to use advanced queuing with objects using
# cx_Oracle. It makes use of a simple type and queue created in the sample
# setup.
#
# This script requires cx_Oracle 7.2 and higher.
#------------------------------------------------------------------------------
from __future__ import print_function
import cx_Oracle
import SampleEnv
import decimal
BOOK_TYPE_NAME = "UDT_BOOK"
QUEUE_NAME = "BOOKS"
BOOK_DATA = [
("The Fellowship of the Ring", "Tolkien, J.R.R.",
decimal.Decimal("10.99")),
("Harry Potter and the Philosopher's Stone", "Rowling, J.K.",
decimal.Decimal("7.99"))
]
# connect to database
connection = cx_Oracle.connect(SampleEnv.GetMainConnectString())
cursor = connection.cursor()
# create queue
booksType = connection.gettype(BOOK_TYPE_NAME)
queue = connection.queue(QUEUE_NAME, booksType)
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
while queue.deqOne():
pass
# enqueue a few messages
print("Enqueuing messages...")
for title, authors, price in BOOK_DATA:
book = booksType.newobject()
book.TITLE = title
book.AUTHORS = authors
book.PRICE = price
print(title)
queue.enqOne(connection.msgproperties(payload=book))
connection.commit()
# dequeue the messages
print("\nDequeuing messages...")
while True:
props = queue.deqOne()
if not props:
break
print(props.payload.TITLE)
connection.commit()
print("\nDone.")

60
samples/RawAQ.py Normal file
View File

@ -0,0 +1,60 @@
#------------------------------------------------------------------------------
# Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
# Canada. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# RawAQ.py
# This script demonstrates how to use advanced queuing with RAW data using
# cx_Oracle. It makes use of a RAW queue created in the sample setup.
#
# This script requires cx_Oracle 7.2 and higher.
#------------------------------------------------------------------------------
from __future__ import print_function
import cx_Oracle
import SampleEnv
QUEUE_NAME = "DEMORAW"
PAYLOAD_DATA = [
"The first message",
"The second message",
"The third message",
"The fourth and final message"
]
# connect to database
connection = cx_Oracle.connect(SampleEnv.GetMainConnectString())
cursor = connection.cursor()
# create queue
queue = connection.queue(QUEUE_NAME)
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
while queue.deqOne():
pass
# enqueue a few messages
print("Enqueuing messages...")
for data in PAYLOAD_DATA:
print(data)
queue.enqOne(connection.msgproperties(payload=data))
connection.commit()
# dequeue the messages
print("\nDequeuing messages...")
while True:
props = queue.deqOne()
if not props:
break
print(props.payload.decode())
connection.commit()
print("\nDone.")

View File

@ -192,10 +192,16 @@ create table &main_user..PlsqlSessionCallbacks (
-- create queue table and queues for demonstrating advanced queuing
begin
dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE',
'&main_user..UDT_BOOK');
dbms_aqadm.create_queue('&main_user..BOOKS', '&main_user..BOOK_QUEUE');
dbms_aqadm.start_queue('&main_user..BOOKS');
dbms_aqadm.create_queue_table('&main_user..RAW_QUEUE', 'RAW');
dbms_aqadm.create_queue('&main_user..DEMORAW', '&main_user..RAW_QUEUE');
dbms_aqadm.start_queue('&main_user..DEMORAW');
end;
/

View File

@ -41,10 +41,12 @@ static PyObject *cxoConnection_createLob(cxoConnection*, PyObject*);
static PyObject *cxoConnection_getStmtCacheSize(cxoConnection*, void*);
static PyObject *cxoConnection_newEnqueueOptions(cxoConnection*, PyObject*);
static PyObject *cxoConnection_newDequeueOptions(cxoConnection*, PyObject*);
static PyObject *cxoConnection_newMessageProperties(cxoConnection*, PyObject*);
static PyObject *cxoConnection_newMessageProperties(cxoConnection*, PyObject*,
PyObject*);
static PyObject *cxoConnection_dequeue(cxoConnection*, PyObject*, PyObject*);
static PyObject *cxoConnection_enqueue(cxoConnection*, PyObject*, PyObject*);
static PyObject *cxoConnection_ping(cxoConnection*, PyObject*);
static PyObject *cxoConnection_queue(cxoConnection*, PyObject*, PyObject*);
static PyObject *cxoConnection_shutdown(cxoConnection*, PyObject*, PyObject*);
static PyObject *cxoConnection_startup(cxoConnection*, PyObject*, PyObject*);
static PyObject *cxoConnection_subscribe(cxoConnection*, PyObject*, PyObject*);
@ -103,11 +105,13 @@ static PyMethodDef cxoConnectionMethods[] = {
{ "enqoptions", (PyCFunction) cxoConnection_newEnqueueOptions,
METH_NOARGS },
{ "msgproperties", (PyCFunction) cxoConnection_newMessageProperties,
METH_NOARGS },
METH_VARARGS | METH_KEYWORDS },
{ "deq", (PyCFunction) cxoConnection_dequeue,
METH_VARARGS | METH_KEYWORDS },
{ "enq", (PyCFunction) cxoConnection_enqueue,
METH_VARARGS | METH_KEYWORDS },
{ "queue", (PyCFunction) cxoConnection_queue,
METH_VARARGS | METH_KEYWORDS },
{ "createlob", (PyCFunction) cxoConnection_createLob, METH_O },
{ "getSodaDatabase", (PyCFunction) cxoConnection_getSodaDatabase,
METH_NOARGS },
@ -1321,7 +1325,7 @@ static PyObject *cxoConnection_cancel(cxoConnection *conn, PyObject *args)
static PyObject *cxoConnection_newEnqueueOptions(cxoConnection *conn,
PyObject *args)
{
return (PyObject*) cxoEnqOptions_new(conn);
return (PyObject*) cxoEnqOptions_new(conn, NULL);
}
@ -1332,7 +1336,7 @@ static PyObject *cxoConnection_newEnqueueOptions(cxoConnection *conn,
static PyObject *cxoConnection_newDequeueOptions(cxoConnection *conn,
PyObject *args)
{
return (PyObject*) cxoDeqOptions_new(conn);
return (PyObject*) cxoDeqOptions_new(conn, NULL);
}
@ -1341,9 +1345,98 @@ static PyObject *cxoConnection_newDequeueOptions(cxoConnection *conn,
// Creates a new message properties object and returns it.
//-----------------------------------------------------------------------------
static PyObject *cxoConnection_newMessageProperties(cxoConnection *conn,
PyObject *args)
PyObject *args, PyObject *keywordArgs)
{
return (PyObject*) cxoMsgProps_new(conn);
static char *keywordList[] = { "payload", "correlation", "delay",
"exceptionQ", "expiration", "priority", NULL };
PyObject *payloadObj, *correlationObj, *exceptionQObj;
int delay, expiration, priority, status;
cxoMsgProps *props;
cxoBuffer buffer;
// parse arguments
expiration = -1;
delay = priority = 0;
payloadObj = correlationObj = exceptionQObj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|OOiOii", keywordList,
&payloadObj, &correlationObj, &delay, &exceptionQObj, &expiration,
&priority))
return NULL;
// create new message properties object
props = cxoMsgProps_new(conn, NULL);
if (!props)
return NULL;
// set payload, if applicable
if (payloadObj) {
Py_INCREF(payloadObj);
props->payload = payloadObj;
}
// set correlation, if applicable
if (correlationObj) {
if (cxoBuffer_fromObject(&buffer, correlationObj,
props->encoding) < 0) {
Py_DECREF(props);
return NULL;
}
status = dpiMsgProps_setCorrelation(props->handle, buffer.ptr,
buffer.size);
cxoBuffer_clear(&buffer);
if (status < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(props);
return NULL;
}
}
// set delay, if applicable
if (delay != 0) {
if (dpiMsgProps_setDelay(props->handle, (int32_t) delay) < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(props);
return NULL;
}
}
// set exception queue, if applicable
if (exceptionQObj) {
if (cxoBuffer_fromObject(&buffer, exceptionQObj,
props->encoding) < 0) {
Py_DECREF(props);
return NULL;
}
status = dpiMsgProps_setExceptionQ(props->handle, buffer.ptr,
buffer.size);
cxoBuffer_clear(&buffer);
if (status < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(props);
return NULL;
}
}
// set expiration, if applicable
if (expiration != -1) {
if (dpiMsgProps_setExpiration(props->handle,
(int32_t) expiration) < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(props);
return NULL;
}
}
// set priority, if applicable
if (priority != 0) {
if (dpiMsgProps_setPriority(props->handle, (int32_t) priority) < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(props);
return NULL;
}
}
return (PyObject*) props;
}
@ -1431,6 +1524,49 @@ static PyObject *cxoConnection_enqueue(cxoConnection *conn, PyObject* args,
}
//-----------------------------------------------------------------------------
// cxoConnection_queue()
// Creates a new queue associated with the connection and returns it to the
// caller.
//-----------------------------------------------------------------------------
static PyObject *cxoConnection_queue(cxoConnection *conn, PyObject* args,
PyObject* keywordArgs)
{
static char *keywordList[] = { "name", "type", NULL };
cxoObjectType *typeObj;
cxoBuffer nameBuffer;
PyObject *nameObj;
dpiQueue *handle;
cxoQueue *queue;
int status;
// parse arguments
typeObj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "O|O!", keywordList,
&nameObj, &cxoPyTypeObjectType, &typeObj))
return NULL;
if (cxoBuffer_fromObject(&nameBuffer, nameObj,
conn->encodingInfo.encoding) < 0)
return NULL;
// create queue
status = dpiConn_newQueue(conn->handle, nameBuffer.ptr, nameBuffer.size,
(typeObj) ? typeObj->handle : NULL, &handle);
cxoBuffer_clear(&nameBuffer);
if (status < 0)
return cxoError_raiseAndReturnNull();
queue = cxoQueue_new(conn, handle);
if (!queue)
return NULL;
Py_INCREF(nameObj);
queue->name = nameObj;
Py_XINCREF(typeObj);
queue->payloadType = typeObj;
return (PyObject*) queue;
}
//-----------------------------------------------------------------------------
// cxoConnection_contextManagerEnter()
// Called when the connection is used as a context manager and simply returns

View File

@ -118,19 +118,27 @@ PyTypeObject cxoPyTypeDeqOptions = {
// cxoDeqOptions_new()
// Create a new dequeue options object.
//-----------------------------------------------------------------------------
cxoDeqOptions *cxoDeqOptions_new(cxoConnection *connection)
cxoDeqOptions *cxoDeqOptions_new(cxoConnection *connection,
dpiDeqOptions *handle)
{
cxoDeqOptions *options;
int status;
options = (cxoDeqOptions*)
cxoPyTypeDeqOptions.tp_alloc(&cxoPyTypeDeqOptions, 0);
if (!options)
return NULL;
if (dpiConn_newDeqOptions(connection->handle, &options->handle) < 0) {
Py_DECREF(options);
if (handle) {
status = dpiDeqOptions_addRef(handle);
} else {
status = dpiConn_newDeqOptions(connection->handle, &handle);
}
if (status < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(options);
return NULL;
}
options->handle = handle;
options->encoding = connection->encodingInfo.encoding;
return options;

View File

@ -90,22 +90,30 @@ PyTypeObject cxoPyTypeEnqOptions = {
// cxoEnqOptions_new()
// Create a new enqueue options object.
//-----------------------------------------------------------------------------
cxoEnqOptions *cxoEnqOptions_new(cxoConnection *connection)
cxoEnqOptions *cxoEnqOptions_new(cxoConnection *connection,
dpiEnqOptions *handle)
{
cxoEnqOptions *self;
cxoEnqOptions *options;
int status;
self = (cxoEnqOptions*)
options = (cxoEnqOptions*)
cxoPyTypeEnqOptions.tp_alloc(&cxoPyTypeEnqOptions, 0);
if (!self)
if (!options)
return NULL;
if (dpiConn_newEnqOptions(connection->handle, &self->handle) < 0) {
Py_DECREF(self);
if (handle) {
status = dpiEnqOptions_addRef(handle);
} else {
status = dpiConn_newEnqOptions(connection->handle, &handle);
}
if (status < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(options);
return NULL;
}
self->encoding = connection->encodingInfo.encoding;
options->handle = handle;
options->encoding = connection->encodingInfo.encoding;
return self;
return options;
}

View File

@ -277,6 +277,7 @@ static PyObject *cxoModule_initialize(void)
CXO_MAKE_TYPE_READY(&cxoPyTypeObject);
CXO_MAKE_TYPE_READY(&cxoPyTypeObjectType);
CXO_MAKE_TYPE_READY(&cxoPyTypeObjectVar);
CXO_MAKE_TYPE_READY(&cxoPyTypeQueue);
CXO_MAKE_TYPE_READY(&cxoPyTypeRowidVar);
CXO_MAKE_TYPE_READY(&cxoPyTypeSessionPool);
CXO_MAKE_TYPE_READY(&cxoPyTypeSodaCollection);

View File

@ -79,6 +79,7 @@ typedef struct cxoMsgProps cxoMsgProps;
typedef struct cxoObject cxoObject;
typedef struct cxoObjectAttr cxoObjectAttr;
typedef struct cxoObjectType cxoObjectType;
typedef struct cxoQueue cxoQueue;
typedef struct cxoSessionPool cxoSessionPool;
typedef struct cxoSodaCollection cxoSodaCollection;
typedef struct cxoSodaDatabase cxoSodaDatabase;
@ -140,6 +141,7 @@ extern PyTypeObject cxoPyTypeObject;
extern PyTypeObject cxoPyTypeObjectAttr;
extern PyTypeObject cxoPyTypeObjectType;
extern PyTypeObject cxoPyTypeObjectVar;
extern PyTypeObject cxoPyTypeQueue;
extern PyTypeObject cxoPyTypeRowidVar;
extern PyTypeObject cxoPyTypeSessionPool;
extern PyTypeObject cxoPyTypeSodaCollection;
@ -319,6 +321,7 @@ struct cxoMessageTable {
struct cxoMsgProps {
PyObject_HEAD
dpiMsgProps *handle;
PyObject *payload;
const char *encoding;
};
@ -351,6 +354,16 @@ struct cxoObjectType {
char isCollection;
};
struct cxoQueue {
PyObject_HEAD
cxoConnection *conn;
dpiQueue *handle;
PyObject *name;
PyObject *deqOptions;
PyObject *enqOptions;
cxoObjectType *payloadType;
};
struct cxoSessionPool {
PyObject_HEAD
dpiPool *handle;
@ -462,9 +475,11 @@ int cxoCursor_performBind(cxoCursor *cursor);
int cxoCursor_setBindVariables(cxoCursor *cursor, PyObject *parameters,
unsigned numElements, unsigned arrayPos, int deferTypeAssignment);
cxoDeqOptions *cxoDeqOptions_new(cxoConnection *connection);
cxoDeqOptions *cxoDeqOptions_new(cxoConnection *connection,
dpiDeqOptions *handle);
cxoEnqOptions *cxoEnqOptions_new(cxoConnection *connection);
cxoEnqOptions *cxoEnqOptions_new(cxoConnection *connection,
dpiEnqOptions *handle);
cxoError *cxoError_newFromInfo(dpiErrorInfo *errorInfo);
int cxoError_raiseAndReturnInt(void);
@ -476,7 +491,7 @@ PyObject *cxoError_raiseFromString(PyObject *exceptionType,
PyObject *cxoLob_new(cxoConnection *connection, dpiOracleTypeNum oracleTypeNum,
dpiLob *handle);
cxoMsgProps *cxoMsgProps_new(cxoConnection*);
cxoMsgProps *cxoMsgProps_new(cxoConnection*, dpiMsgProps *handle);
int cxoObject_internalExtend(cxoObject *obj, PyObject *sequence);
PyObject *cxoObject_new(cxoObjectType *objectType, dpiObject *handle);
@ -489,6 +504,8 @@ cxoObjectType *cxoObjectType_new(cxoConnection *connection,
cxoObjectType *cxoObjectType_newByName(cxoConnection *connection,
PyObject *name);
cxoQueue *cxoQueue_new(cxoConnection *conn, dpiQueue *handle);
cxoSodaCollection *cxoSodaCollection_new(cxoSodaDatabase *db,
dpiSodaColl *handle);

View File

@ -15,7 +15,7 @@
#include "cxoModule.h"
//-----------------------------------------------------------------------------
// Declaration of methods used for message properties
// forward declarations
//-----------------------------------------------------------------------------
static void cxoMsgProps_free(cxoMsgProps*);
static PyObject *cxoMsgProps_getNumAttempts(cxoMsgProps*, void*);
@ -37,9 +37,18 @@ static int cxoMsgProps_setPriority(cxoMsgProps*, PyObject*, void*);
//-----------------------------------------------------------------------------
// declaration of calculated members for Python type "MessageProperties"
// declaration of members
//-----------------------------------------------------------------------------
static PyGetSetDef cxoMsgPropsCalcMembers[] = {
static PyMemberDef cxoMembers[] = {
{ "payload", T_OBJECT, offsetof(cxoMsgProps, payload), 0 },
{ NULL }
};
//-----------------------------------------------------------------------------
// declaration of calculated members
//-----------------------------------------------------------------------------
static PyGetSetDef cxoCalcMembers[] = {
{ "attempts", (getter) cxoMsgProps_getNumAttempts, 0, 0, 0 },
{ "correlation", (getter) cxoMsgProps_getCorrelation,
(setter) cxoMsgProps_setCorrelation, 0, 0 },
@ -92,8 +101,8 @@ PyTypeObject cxoPyTypeMsgProps = {
0, // tp_iter
0, // tp_iternext
0, // tp_methods
0, // tp_members
cxoMsgPropsCalcMembers, // tp_getset
cxoMembers, // tp_members
cxoCalcMembers, // tp_getset
0, // tp_base
0, // tp_dict
0, // tp_descr_get
@ -112,18 +121,22 @@ PyTypeObject cxoPyTypeMsgProps = {
// cxoMsgProps_new()
// Create a new message properties object.
//-----------------------------------------------------------------------------
cxoMsgProps *cxoMsgProps_new(cxoConnection *connection)
cxoMsgProps *cxoMsgProps_new(cxoConnection *connection, dpiMsgProps *handle)
{
cxoMsgProps *props;
props = (cxoMsgProps*) cxoPyTypeMsgProps.tp_alloc(&cxoPyTypeMsgProps, 0);
if (!props)
if (!props) {
if (handle)
dpiMsgProps_release(handle);
return NULL;
if (dpiConn_newMsgProps(connection->handle, &props->handle) < 0) {
}
if (!handle && dpiConn_newMsgProps(connection->handle, &handle) < 0) {
Py_DECREF(props);
cxoError_raiseAndReturnNull();
return NULL;
}
props->handle = handle;
props->encoding = connection->encodingInfo.encoding;
return props;

429
src/cxoQueue.c Normal file
View File

@ -0,0 +1,429 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
// cxoQueue.c
// Defines the routines for handling queues (advanced queuing). These queues
// permit sending and receiving messages defined by the database.
//-----------------------------------------------------------------------------
#include "cxoModule.h"
//-----------------------------------------------------------------------------
// Declaration of functions
//-----------------------------------------------------------------------------
static void cxoQueue_free(cxoQueue*);
static PyObject *cxoQueue_repr(cxoQueue*);
static PyObject *cxoQueue_deqMany(cxoQueue*, PyObject*);
static PyObject *cxoQueue_deqOne(cxoQueue*, PyObject*);
static PyObject *cxoQueue_enqMany(cxoQueue*, PyObject*);
static PyObject *cxoQueue_enqOne(cxoQueue*, PyObject*);
//-----------------------------------------------------------------------------
// declaration of methods
//-----------------------------------------------------------------------------
static PyMethodDef cxoMethods[] = {
{ "deqMany", (PyCFunction) cxoQueue_deqMany, METH_VARARGS },
{ "deqOne", (PyCFunction) cxoQueue_deqOne, METH_NOARGS },
{ "enqMany", (PyCFunction) cxoQueue_enqMany, METH_VARARGS },
{ "enqOne", (PyCFunction) cxoQueue_enqOne, METH_VARARGS },
{ NULL }
};
//-----------------------------------------------------------------------------
// declaration of members
//-----------------------------------------------------------------------------
static PyMemberDef cxoMembers[] = {
{ "connection", T_OBJECT, offsetof(cxoQueue, conn), READONLY },
{ "deqOptions", T_OBJECT, offsetof(cxoQueue, deqOptions), READONLY },
{ "enqOptions", T_OBJECT, offsetof(cxoQueue, enqOptions), READONLY },
{ "name", T_OBJECT, offsetof(cxoQueue, name), READONLY },
{ "payloadType", T_OBJECT, offsetof(cxoQueue, payloadType), READONLY },
{ NULL }
};
//-----------------------------------------------------------------------------
// Python type declarations
//-----------------------------------------------------------------------------
PyTypeObject cxoPyTypeQueue = {
PyVarObject_HEAD_INIT(NULL, 0)
"cx_Oracle.Queue", // tp_name
sizeof(cxoQueue), // tp_basicsize
0, // tp_itemsize
(destructor) cxoQueue_free, // tp_dealloc
0, // tp_print
0, // tp_getattr
0, // tp_setattr
0, // tp_compare
(reprfunc) cxoQueue_repr, // tp_repr
0, // tp_as_number
0, // tp_as_sequence
0, // tp_as_mapping
0, // tp_hash
0, // tp_call
0, // tp_str
0, // tp_getattro
0, // tp_setattro
0, // tp_as_buffer
Py_TPFLAGS_DEFAULT, // tp_flags
0, // tp_doc
0, // tp_traverse
0, // tp_clear
0, // tp_richcompare
0, // tp_weaklistoffset
0, // tp_iter
0, // tp_iternext
cxoMethods, // tp_methods
cxoMembers, // tp_members
0, // tp_getset
0, // tp_base
0, // tp_dict
0, // tp_descr_get
0, // tp_descr_set
0, // tp_dictoffset
0, // tp_init
0, // tp_alloc
0, // tp_new
0, // tp_free
0, // tp_is_gc
0 // tp_bases
};
//-----------------------------------------------------------------------------
// cxoQueue_new()
// Create a new queue (advanced queuing).
//-----------------------------------------------------------------------------
cxoQueue *cxoQueue_new(cxoConnection *conn, dpiQueue *handle)
{
dpiDeqOptions *deqOptions;
dpiEnqOptions *enqOptions;
cxoQueue *queue;
// create queue and populate basic attributes
queue = (cxoQueue*) cxoPyTypeQueue.tp_alloc(&cxoPyTypeQueue, 0);
if (!queue) {
dpiQueue_release(handle);
return NULL;
}
Py_INCREF(conn);
queue->conn = conn;
queue->handle = handle;
// get dequeue options
if (dpiQueue_getDeqOptions(queue->handle, &deqOptions) < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(queue);
return NULL;
}
queue->deqOptions = (PyObject*) cxoDeqOptions_new(conn, deqOptions);
if (!queue->deqOptions) {
Py_DECREF(queue);
return NULL;
}
// get enqueue options
if (dpiQueue_getEnqOptions(queue->handle, &enqOptions) < 0) {
cxoError_raiseAndReturnNull();
Py_DECREF(queue);
return NULL;
}
queue->enqOptions = (PyObject*) cxoEnqOptions_new(conn, enqOptions);
if (!queue->enqOptions) {
Py_DECREF(queue);
return NULL;
}
return queue;
}
//-----------------------------------------------------------------------------
// cxoQueue_free()
// Free the memory associated with a queue.
//-----------------------------------------------------------------------------
static void cxoQueue_free(cxoQueue *queue)
{
if (queue->handle) {
dpiQueue_release(queue->handle);
queue->handle = NULL;
}
Py_CLEAR(queue->conn);
Py_CLEAR(queue->name);
Py_CLEAR(queue->payloadType);
Py_CLEAR(queue->deqOptions);
Py_CLEAR(queue->enqOptions);
Py_TYPE(queue)->tp_free((PyObject*) queue);
}
//-----------------------------------------------------------------------------
// cxoQueue_repr()
// Return a string representation of a queue.
//-----------------------------------------------------------------------------
static PyObject *cxoQueue_repr(cxoQueue *queue)
{
PyObject *module, *name, *result;
if (cxoUtils_getModuleAndName(Py_TYPE(queue), &module, &name) < 0)
return NULL;
result = cxoUtils_formatString("<%s.%s %r>",
PyTuple_Pack(3, module, name, queue->name));
Py_DECREF(module);
Py_DECREF(name);
return result;
}
//-----------------------------------------------------------------------------
// cxoQueue_deqHelper()
// Helper for dequeuing messages from a queue.
//-----------------------------------------------------------------------------
int cxoQueue_deqHelper(cxoQueue *queue, uint32_t *numProps,
cxoMsgProps **props)
{
uint32_t bufferLength, i, j;
dpiMsgProps **handles;
dpiObject *objHandle;
const char *buffer;
cxoMsgProps *temp;
cxoObject *obj;
int ok;
// use the same array to store the intermediate values provided by ODPI-C;
// by doing so there is no need to allocate an additional array and any
// values created by this helper routine are cleaned up on error
handles = (dpiMsgProps**) props;
// perform dequeue
if (dpiQueue_deqMany(queue->handle, numProps, handles) < 0)
return cxoError_raiseAndReturnInt();
// create objects that are returned to the user
for (i = 0; i < *numProps; i++) {
// create message property object
temp = cxoMsgProps_new(queue->conn, handles[i]);
ok = (temp) ? 1 : 0;
props[i] = temp;
// get payload from ODPI-C message property
if (ok && dpiMsgProps_getPayload(temp->handle, &objHandle, &buffer,
&bufferLength) < 0) {
cxoError_raiseAndReturnInt();
ok = 0;
}
// store payload on cx_Oracle message property
if (ok && objHandle) {
obj = (cxoObject*) cxoObject_new(queue->payloadType, objHandle);
if (obj && dpiObject_addRef(objHandle) < 0) {
cxoError_raiseAndReturnInt();
obj->handle = NULL;
Py_CLEAR(obj);
ok = 0;
}
temp->payload = (PyObject*) obj;
} else if (ok) {
temp->payload = PyBytes_FromStringAndSize(buffer, bufferLength);
}
// if an error occurred, do some cleanup
if (!ok || !temp->payload) {
Py_XDECREF(temp);
for (j = 0; j < i; j++)
Py_DECREF(props[j]);
for (j = i + 1; j < *numProps; j++)
dpiMsgProps_release(handles[j]);
return -1;
}
}
return 0;
}
//-----------------------------------------------------------------------------
// cxoQueue_enqHelper()
// Helper for enqueuing messages from a queue.
//-----------------------------------------------------------------------------
int cxoQueue_enqHelper(cxoQueue *queue, uint32_t numProps,
cxoMsgProps **props)
{
dpiMsgProps **handles, *tempHandle;
cxoBuffer buffer;
cxoObject *obj;
uint32_t i;
int status;
// use the same array to store the intermediate values required by ODPI-C;
// by doing so there is no need to allocate an additional array
handles = (dpiMsgProps**) props;
// process array
for (i = 0; i < numProps; i++) {
// verify that the message property object has a payload
if (!props[i]->payload || props[i]->payload == Py_None) {
cxoError_raiseFromString(cxoProgrammingErrorException,
"message has no payload");
return -1;
}
// transfer payload to message properties object
tempHandle = props[i]->handle;
if (PyObject_IsInstance(props[i]->payload,
(PyObject*) &cxoPyTypeObject)) {
obj = (cxoObject*) props[i]->payload;
if (dpiMsgProps_setPayloadObject(props[i]->handle,
obj->handle) < 0)
return cxoError_raiseAndReturnInt();
} else {
if (cxoBuffer_fromObject(&buffer, props[i]->payload,
props[i]->encoding) < 0)
return -1;
status = dpiMsgProps_setPayloadBytes(props[i]->handle, buffer.ptr,
buffer.size);
cxoBuffer_clear(&buffer);
if (status < 0)
return cxoError_raiseAndReturnInt();
}
handles[i] = tempHandle;
}
// perform enqueue
if (dpiQueue_enqMany(queue->handle, numProps, handles) < 0)
return cxoError_raiseAndReturnInt();
return 0;
}
//-----------------------------------------------------------------------------
// cxoQueue_deqMany()
// Dequeue a single message to the queue.
//-----------------------------------------------------------------------------
static PyObject *cxoQueue_deqMany(cxoQueue *queue, PyObject *args)
{
unsigned int numPropsFromPython;
uint32_t numProps, i;
cxoMsgProps **props;
PyObject *result;
if (!PyArg_ParseTuple(args, "I", &numPropsFromPython))
return NULL;
numProps = (uint32_t) numPropsFromPython;
props = PyMem_Malloc(numProps * sizeof(cxoMsgProps*));
if (!props)
return NULL;
if (cxoQueue_deqHelper(queue, &numProps, props) < 0) {
PyMem_Free(props);
return NULL;
}
result = PyList_New(numProps);
if (!result) {
for (i = 0; i < numProps; i++)
Py_DECREF(props[i]);
PyMem_Free(props);
return NULL;
}
for (i = 0; i < numProps; i++)
PyList_SET_ITEM(result, i, (PyObject*) props[i]);
PyMem_Free(props);
return result;
}
//-----------------------------------------------------------------------------
// cxoQueue_deqOne()
// Dequeue a single message to the queue.
//-----------------------------------------------------------------------------
static PyObject *cxoQueue_deqOne(cxoQueue *queue, PyObject *args)
{
uint32_t numProps = 1;
cxoMsgProps *props;
if (cxoQueue_deqHelper(queue, &numProps, &props) < 0)
return NULL;
if (numProps > 0)
return (PyObject*) props;
Py_RETURN_NONE;
}
//-----------------------------------------------------------------------------
// cxoQueue_enqMany()
// Enqueue multiple messages to the queue.
//-----------------------------------------------------------------------------
static PyObject *cxoQueue_enqMany(cxoQueue *queue, PyObject *args)
{
PyObject *seq, *seqCheck, *temp;
Py_ssize_t seqLength, i;
cxoMsgProps **props;
int status;
// validate arguments
if (!PyArg_ParseTuple(args, "O", &seqCheck))
return NULL;
seq = PySequence_Fast(seqCheck, "expecting sequence");
if (!seq)
return NULL;
// zero messages means nothing to do
seqLength = PySequence_Length(seq);
if (seqLength == 0) {
Py_DECREF(seq);
Py_RETURN_NONE;
}
// populate array of properties
props = PyMem_Malloc(seqLength * sizeof(cxoMsgProps*));
if (!props) {
PyErr_NoMemory();
Py_DECREF(seq);
return NULL;
}
for (i = 0; i < seqLength; i++) {
temp = PySequence_Fast_GET_ITEM(seq, i);
if (Py_TYPE(temp) != &cxoPyTypeMsgProps) {
Py_DECREF(seq);
PyMem_Free(props);
PyErr_SetString(PyExc_TypeError,
"expecting sequence of message property objects");
return NULL;
}
props[i] = (cxoMsgProps*) temp;
}
// perform enqueue
status = cxoQueue_enqHelper(queue, (uint32_t) seqLength, props);
Py_DECREF(seq);
PyMem_Free(props);
if (status < 0)
return NULL;
Py_RETURN_NONE;
}
//-----------------------------------------------------------------------------
// cxoQueue_enqOne()
// Enqueue a single message to the queue.
//-----------------------------------------------------------------------------
static PyObject *cxoQueue_enqOne(cxoQueue *queue, PyObject *args)
{
cxoMsgProps *props;
if (!PyArg_ParseTuple(args, "O!", &cxoPyTypeMsgProps, &props))
return NULL;
if (cxoQueue_enqHelper(queue, 1, &props) < 0)
return NULL;
Py_RETURN_NONE;
}