From 0f3b0f17c001715836848d82423a5d7ba27319b8 Mon Sep 17 00:00:00 2001 From: Anthony Tuininga Date: Tue, 1 Mar 2016 15:34:05 -0700 Subject: [PATCH] Added support for using the C API for advanced queuing. --- AQ.c | 760 +++++++++++++++++++++++++++++++++++++ Connection.c | 161 ++++++++ cx_Oracle.c | 31 ++ doc/aq.rst | 218 +++++++++++ doc/connection.rst | 66 ++++ doc/index.rst | 1 + doc/module.rst | 161 ++++++++ samples/AdvancedQueuing.py | 51 +-- setup.py | 2 +- 9 files changed, 1406 insertions(+), 45 deletions(-) create mode 100644 AQ.c create mode 100644 doc/aq.rst diff --git a/AQ.c b/AQ.c new file mode 100644 index 0000000..842a7cc --- /dev/null +++ b/AQ.c @@ -0,0 +1,760 @@ +//----------------------------------------------------------------------------- +// AQ.c +// Implements the enqueue and dequeue options and message properties objects +// used in Advanced Queuing. +//----------------------------------------------------------------------------- + +//----------------------------------------------------------------------------- +// structures used for handling AQ options and message properties +//----------------------------------------------------------------------------- +typedef struct { + PyObject_HEAD + udt_Environment *environment; + OCIAQEnqOptions *handle; +} udt_EnqOptions; + +typedef struct { + PyObject_HEAD + udt_Environment *environment; + OCIAQDeqOptions *handle; +} udt_DeqOptions; + +typedef struct { + PyObject_HEAD + udt_Environment *environment; + OCIAQMsgProperties *handle; +} udt_MessageProperties; + + +//----------------------------------------------------------------------------- +// Declaration of methods used for enqueue options +//----------------------------------------------------------------------------- +static udt_EnqOptions *EnqOptions_New(udt_Environment*); +static void EnqOptions_Free(udt_EnqOptions*); +static PyObject *EnqOptions_GetOCIAttr(udt_EnqOptions*, ub4*); +static int EnqOptions_SetOCIAttr(udt_EnqOptions*, PyObject*, ub4*); + + +//----------------------------------------------------------------------------- +// Declaration of methods used for dequeue options +//----------------------------------------------------------------------------- +static udt_DeqOptions *DeqOptions_New(udt_Environment*); +static void DeqOptions_Free(udt_DeqOptions*); +static PyObject *DeqOptions_GetOCIAttr(udt_DeqOptions*, ub4*); +static int DeqOptions_SetOCIAttr(udt_DeqOptions*, PyObject*, ub4*); + + +//----------------------------------------------------------------------------- +// Declaration of methods used for message properties +//----------------------------------------------------------------------------- +static udt_MessageProperties *MessageProperties_New(udt_Environment*); +static void MessageProperties_Free(udt_MessageProperties*); +static PyObject *MessageProperties_GetOCIAttr(udt_MessageProperties*, ub4*); +static int MessageProperties_SetOCIAttr(udt_MessageProperties*, PyObject*, + ub4*); + + +//----------------------------------------------------------------------------- +// constants for OCI attributes +//----------------------------------------------------------------------------- +static ub4 gc_AQAttempts = OCI_ATTR_ATTEMPTS; +static ub4 gc_AQConsumerName = OCI_ATTR_CONSUMER_NAME; +static ub4 gc_AQCorrelation = OCI_ATTR_CORRELATION; +static ub4 gc_AQDelay = OCI_ATTR_DELAY; +static ub4 gc_AQDeliveryMode = OCI_ATTR_MSG_DELIVERY_MODE; +static ub4 gc_AQDeqCondition = OCI_ATTR_DEQCOND; +static ub4 gc_AQDeqMode = OCI_ATTR_DEQ_MODE; +static ub4 gc_AQDeqMsgId = OCI_ATTR_DEQ_MSGID; +static ub4 gc_AQEnqTime = OCI_ATTR_ENQ_TIME; +static ub4 gc_AQExceptionQ = OCI_ATTR_EXCEPTION_QUEUE; +static ub4 gc_AQExpiration = OCI_ATTR_EXPIRATION; +static ub4 gc_AQNavigation = OCI_ATTR_NAVIGATION; +static ub4 gc_AQOriginalMsgId = OCI_ATTR_ORIGINAL_MSGID; +static ub4 gc_AQPriority = OCI_ATTR_PRIORITY; +static ub4 gc_AQState = OCI_ATTR_MSG_STATE; +static ub4 gc_AQTransformation = OCI_ATTR_TRANSFORMATION; +static ub4 gc_AQVisibility = OCI_ATTR_VISIBILITY; +static ub4 gc_AQWait = OCI_ATTR_WAIT; + + +//----------------------------------------------------------------------------- +// declaration of calculated members for Python type "EnqOptions" +//----------------------------------------------------------------------------- +static PyGetSetDef g_EnqOptionsCalcMembers[] = { + { "deliverymode", 0, (setter) EnqOptions_SetOCIAttr, 0, + &gc_AQDeliveryMode }, + { "transformation", (getter) EnqOptions_GetOCIAttr, + (setter) EnqOptions_SetOCIAttr, 0, &gc_AQTransformation }, + { "visibility", (getter) EnqOptions_GetOCIAttr, + (setter) EnqOptions_SetOCIAttr, 0, &gc_AQVisibility }, + { NULL } +}; + + +//----------------------------------------------------------------------------- +// declaration of calculated members for Python type "DeqOptions" +//----------------------------------------------------------------------------- +static PyGetSetDef g_DeqOptionsCalcMembers[] = { + { "condition", (getter) DeqOptions_GetOCIAttr, + (setter) DeqOptions_SetOCIAttr, 0, &gc_AQDeqCondition }, + { "consumername", (getter) DeqOptions_GetOCIAttr, + (setter) DeqOptions_SetOCIAttr, 0, &gc_AQConsumerName }, + { "correlation", (getter) DeqOptions_GetOCIAttr, + (setter) DeqOptions_SetOCIAttr, 0, &gc_AQCorrelation }, + { "deliverymode", 0, (setter) EnqOptions_SetOCIAttr, 0, + &gc_AQDeliveryMode }, + { "mode", (getter) DeqOptions_GetOCIAttr, + (setter) DeqOptions_SetOCIAttr, 0, &gc_AQDeqMode }, + { "msgid", (getter) DeqOptions_GetOCIAttr, + (setter) DeqOptions_SetOCIAttr, 0, &gc_AQDeqMsgId }, + { "navigation", (getter) DeqOptions_GetOCIAttr, + (setter) DeqOptions_SetOCIAttr, 0, &gc_AQNavigation }, + { "transformation", (getter) EnqOptions_GetOCIAttr, + (setter) EnqOptions_SetOCIAttr, 0, &gc_AQTransformation }, + { "visibility", (getter) DeqOptions_GetOCIAttr, + (setter) DeqOptions_SetOCIAttr, 0, &gc_AQVisibility }, + { "wait", (getter) DeqOptions_GetOCIAttr, + (setter) DeqOptions_SetOCIAttr, 0, &gc_AQWait }, + { NULL } +}; + + +//----------------------------------------------------------------------------- +// declaration of calculated members for Python type "MessageProperties" +//----------------------------------------------------------------------------- +static PyGetSetDef g_MessagePropertiesCalcMembers[] = { + { "attempts", (getter) MessageProperties_GetOCIAttr, 0, 0, + &gc_AQAttempts }, + { "correlation", (getter) MessageProperties_GetOCIAttr, + (setter) MessageProperties_SetOCIAttr, 0, &gc_AQCorrelation }, + { "delay", (getter) MessageProperties_GetOCIAttr, + (setter) MessageProperties_SetOCIAttr, 0, &gc_AQDelay }, + { "deliverymode", (getter) MessageProperties_GetOCIAttr, 0, 0, + &gc_AQDeliveryMode }, + { "enqtime", (getter) MessageProperties_GetOCIAttr, 0, 0, &gc_AQEnqTime }, + { "exceptionq", (getter) MessageProperties_GetOCIAttr, + (setter) MessageProperties_SetOCIAttr, 0, &gc_AQExceptionQ }, + { "expiration", (getter) MessageProperties_GetOCIAttr, + (setter) MessageProperties_SetOCIAttr, 0, &gc_AQExpiration }, + { "msgid", (getter) MessageProperties_GetOCIAttr, + (setter) MessageProperties_SetOCIAttr, 0, &gc_AQOriginalMsgId }, + { "priority", (getter) MessageProperties_GetOCIAttr, + (setter) MessageProperties_SetOCIAttr, 0, &gc_AQPriority }, + { "state", (getter) MessageProperties_GetOCIAttr, 0, 0, &gc_AQState }, + { NULL } +}; + + +//----------------------------------------------------------------------------- +// Python type declarations +//----------------------------------------------------------------------------- +static PyTypeObject g_EnqOptionsType = { + PyVarObject_HEAD_INIT(NULL, 0) + "cx_Oracle.EnqOptions", // tp_name + sizeof(udt_EnqOptions), // tp_basicsize + 0, // tp_itemsize + (destructor) EnqOptions_Free, // tp_dealloc + 0, // tp_print + 0, // tp_getattr + 0, // tp_setattr + 0, // tp_compare + 0, // tp_repr + 0, // tp_as_number + 0, // tp_as_sequence + 0, // tp_as_mapping + 0, // tp_hash + 0, // tp_call + 0, // tp_str + 0, // tp_getattro + 0, // tp_setattro + 0, // tp_as_buffer + Py_TPFLAGS_DEFAULT, // tp_flags + 0, // tp_doc + 0, // tp_traverse + 0, // tp_clear + 0, // tp_richcompare + 0, // tp_weaklistoffset + 0, // tp_iter + 0, // tp_iternext + 0, // tp_methods + 0, // tp_members + g_EnqOptionsCalcMembers, // tp_getset + 0, // tp_base + 0, // tp_dict + 0, // tp_descr_get + 0, // tp_descr_set + 0, // tp_dictoffset + 0, // tp_init + 0, // tp_alloc + 0, // tp_new + 0, // tp_free + 0, // tp_is_gc + 0 // tp_bases +}; + + +static PyTypeObject g_DeqOptionsType = { + PyVarObject_HEAD_INIT(NULL, 0) + "cx_Oracle.DeqOptions", // tp_name + sizeof(udt_DeqOptions), // tp_basicsize + 0, // tp_itemsize + (destructor) DeqOptions_Free, // tp_dealloc + 0, // tp_print + 0, // tp_getattr + 0, // tp_setattr + 0, // tp_compare + 0, // tp_repr + 0, // tp_as_number + 0, // tp_as_sequence + 0, // tp_as_mapping + 0, // tp_hash + 0, // tp_call + 0, // tp_str + 0, // tp_getattro + 0, // tp_setattro + 0, // tp_as_buffer + Py_TPFLAGS_DEFAULT, // tp_flags + 0, // tp_doc + 0, // tp_traverse + 0, // tp_clear + 0, // tp_richcompare + 0, // tp_weaklistoffset + 0, // tp_iter + 0, // tp_iternext + 0, // tp_methods + 0, // tp_members + g_DeqOptionsCalcMembers, // tp_getset + 0, // tp_base + 0, // tp_dict + 0, // tp_descr_get + 0, // tp_descr_set + 0, // tp_dictoffset + 0, // tp_init + 0, // tp_alloc + 0, // tp_new + 0, // tp_free + 0, // tp_is_gc + 0 // tp_bases +}; + + +static PyTypeObject g_MessagePropertiesType = { + PyVarObject_HEAD_INIT(NULL, 0) + "cx_Oracle.MessageProperties", // tp_name + sizeof(udt_MessageProperties), // tp_basicsize + 0, // tp_itemsize + (destructor) MessageProperties_Free,// tp_dealloc + 0, // tp_print + 0, // tp_getattr + 0, // tp_setattr + 0, // tp_compare + 0, // tp_repr + 0, // tp_as_number + 0, // tp_as_sequence + 0, // tp_as_mapping + 0, // tp_hash + 0, // tp_call + 0, // tp_str + 0, // tp_getattro + 0, // tp_setattro + 0, // tp_as_buffer + Py_TPFLAGS_DEFAULT, // tp_flags + 0, // tp_doc + 0, // tp_traverse + 0, // tp_clear + 0, // tp_richcompare + 0, // tp_weaklistoffset + 0, // tp_iter + 0, // tp_iternext + 0, // tp_methods + 0, // tp_members + g_MessagePropertiesCalcMembers, // tp_getset + 0, // tp_base + 0, // tp_dict + 0, // tp_descr_get + 0, // tp_descr_set + 0, // tp_dictoffset + 0, // tp_init + 0, // tp_alloc + 0, // tp_new + 0, // tp_free + 0, // tp_is_gc + 0 // tp_bases +}; + + +//----------------------------------------------------------------------------- +// EnqOptions_New() +// Create a new enqueue options object. +//----------------------------------------------------------------------------- +static udt_EnqOptions *EnqOptions_New( + udt_Environment *env) // environment in which to create +{ + udt_EnqOptions *self; + sword status; + + self = (udt_EnqOptions*) g_EnqOptionsType.tp_alloc(&g_EnqOptionsType, 0); + if (!self) + return NULL; + Py_INCREF(env); + self->environment = env; + status = OCIDescriptorAlloc(env->handle, (dvoid**) &self->handle, + OCI_DTYPE_AQENQ_OPTIONS, 0, 0); + if (Environment_CheckForError(env, status, "EnqOptions_New()") < 0) { + Py_DECREF(self); + return NULL; + } + + return self; +} + + +//----------------------------------------------------------------------------- +// EnqOptions_Free() +// Free the memory associated with the enqueue options object. +//----------------------------------------------------------------------------- +static void EnqOptions_Free( + udt_EnqOptions *self) // object to free +{ + if (self->handle) + OCIDescriptorFree(self->handle, OCI_DTYPE_AQENQ_OPTIONS); + Py_CLEAR(self->environment); + Py_TYPE(self)->tp_free((PyObject*) self); +} + + +//----------------------------------------------------------------------------- +// EnqOptions_GetOCIAttr() +// Get the value of the OCI attribute. +//----------------------------------------------------------------------------- +static PyObject *EnqOptions_GetOCIAttr( + udt_EnqOptions *self, // options object + ub4 *attribute) // OCI attribute type +{ + ub4 valueLength, ub4Value; + dvoid *ociValue = NULL; + char *textValue; + sword status; + + // get the value from the OCI + switch (*attribute) { + case OCI_ATTR_VISIBILITY: + ociValue = &ub4Value; + break; + case OCI_ATTR_TRANSFORMATION: + ociValue = &textValue; + break; + }; + status = OCIAttrGet(self->handle, OCI_DTYPE_AQENQ_OPTIONS, ociValue, + &valueLength, *attribute, self->environment->errorHandle); + if (Environment_CheckForError(self->environment, status, + "EnqOptions_GetOCIAttr()") < 0) + return NULL; + if (*attribute == gc_AQTransformation) { + if (!textValue) + Py_RETURN_NONE; + return cxString_FromEncodedString(textValue, valueLength, + self->environment->encoding); + } + return PyInt_FromLong(ub4Value); +} + + +//----------------------------------------------------------------------------- +// EnqOptions_SetOCIAttr() +// Set the value of the OCI attribute. +//----------------------------------------------------------------------------- +static int EnqOptions_SetOCIAttr( + udt_EnqOptions *self, // options object + PyObject *value, // value to set + ub4 *attribute) // OCI attribute type +{ + dvoid *ociValue = NULL; + ub4 valueLength = 0; + udt_Buffer buffer; + ub4 ub4Value; + ub2 ub2Value; + sword status; + + switch (*attribute) { + case OCI_ATTR_MSG_DELIVERY_MODE: + ub2Value = (ub2) PyInt_AsLong(value); + if (PyErr_Occurred()) + return -1; + ociValue = &ub2Value; + break; + case OCI_ATTR_VISIBILITY: + ub4Value = (ub4) PyInt_AsLong(value); + if (PyErr_Occurred()) + return -1; + ociValue = &ub4Value; + break; + case OCI_ATTR_TRANSFORMATION: + if (cxBuffer_FromObject(&buffer, value, + self->environment->encoding) < 0) + return -1; + ociValue = (dvoid*) buffer.ptr; + valueLength = buffer.size; + break; + }; + status = OCIAttrSet(self->handle, OCI_DTYPE_AQENQ_OPTIONS, + ociValue, valueLength, *attribute, self->environment->errorHandle); + if (*attribute == gc_AQTransformation) + cxBuffer_Clear(&buffer); + if (Environment_CheckForError(self->environment, status, + "EnqOptions_SetOCIAttr()") < 0) + return -1; + return 0; +} + + +//----------------------------------------------------------------------------- +// DeqOptions_New() +// Create a new dequeue options object. +//----------------------------------------------------------------------------- +static udt_DeqOptions *DeqOptions_New( + udt_Environment *env) // environment in which to create +{ + udt_DeqOptions *self; + sword status; + + self = (udt_DeqOptions*) g_DeqOptionsType.tp_alloc(&g_DeqOptionsType, 0); + if (!self) + return NULL; + Py_INCREF(env); + self->environment = env; + status = OCIDescriptorAlloc(env->handle, (dvoid**) &self->handle, + OCI_DTYPE_AQDEQ_OPTIONS, 0, 0); + if (Environment_CheckForError(env, status, "DeqOptions_New()") < 0) { + Py_DECREF(self); + return NULL; + } + + return self; +} + + +//----------------------------------------------------------------------------- +// DeqOptions_Free() +// Free the memory associated with the dequeue options object. +//----------------------------------------------------------------------------- +static void DeqOptions_Free( + udt_DeqOptions *self) // object to free +{ + if (self->handle) + OCIDescriptorFree(self->handle, OCI_DTYPE_AQDEQ_OPTIONS); + Py_CLEAR(self->environment); + Py_TYPE(self)->tp_free((PyObject*) self); +} + + +//----------------------------------------------------------------------------- +// DeqOptions_GetOCIAttr() +// Get the value of the OCI attribute. +//----------------------------------------------------------------------------- +static PyObject *DeqOptions_GetOCIAttr( + udt_DeqOptions *self, // options object + ub4 *attribute) // OCI attribute type +{ + ub4 valueLength, ub4Value; + dvoid *ociValue = NULL; + char *rawValuePtr; + OCIRaw *rawValue; + char *textValue; + sword status; + + // get the value from the OCI + switch (*attribute) { + case OCI_ATTR_DEQ_MODE: + case OCI_ATTR_NAVIGATION: + case OCI_ATTR_VISIBILITY: + case OCI_ATTR_WAIT: + ociValue = &ub4Value; + break; + case OCI_ATTR_CONSUMER_NAME: + case OCI_ATTR_CORRELATION: + case OCI_ATTR_DEQCOND: + case OCI_ATTR_TRANSFORMATION: + ociValue = &textValue; + break; + case OCI_ATTR_DEQ_MSGID: + rawValue = NULL; + ociValue = &rawValue; + break; + }; + status = OCIAttrGet(self->handle, OCI_DTYPE_AQDEQ_OPTIONS, ociValue, + &valueLength, *attribute, self->environment->errorHandle); + if (Environment_CheckForError(self->environment, status, + "DeqOptions_GetOCIAttr()") < 0) + return NULL; + if (ociValue == &textValue) { + if (!textValue) + Py_RETURN_NONE; + return cxString_FromEncodedString(textValue, valueLength, + self->environment->encoding); + } else if (ociValue == &rawValue) { + if (!rawValue) + Py_RETURN_NONE; + rawValuePtr = (char*) OCIRawPtr(self->environment->handle, rawValue); + valueLength = OCIRawSize(self->environment->handle, rawValue); + return PyBytes_FromStringAndSize(rawValuePtr, valueLength); + } + return PyInt_FromLong(ub4Value); +} + + +//----------------------------------------------------------------------------- +// DeqOptions_SetOCIAttr() +// Set the value of the OCI attribute. +//----------------------------------------------------------------------------- +static int DeqOptions_SetOCIAttr( + udt_DeqOptions *self, // options object + PyObject *value, // value to set + ub4 *attribute) // OCI attribute type +{ + Py_ssize_t rawValueLength; + OCIRaw *rawValue = NULL; + dvoid *ociValue = NULL; + ub4 valueLength = 0; + udt_Buffer buffer; + char *rawValuePtr; + ub4 ub4Value; + ub2 ub2Value; + sword status; + + cxBuffer_Init(&buffer); + switch (*attribute) { + case OCI_ATTR_MSG_DELIVERY_MODE: + ub2Value = (ub2) PyInt_AsLong(value); + if (PyErr_Occurred()) + return -1; + ociValue = &ub2Value; + break; + case OCI_ATTR_DEQ_MODE: + case OCI_ATTR_NAVIGATION: + case OCI_ATTR_VISIBILITY: + case OCI_ATTR_WAIT: + ub4Value = (ub4) PyInt_AsLong(value); + if (PyErr_Occurred()) + return -1; + ociValue = &ub4Value; + break; + case OCI_ATTR_CONSUMER_NAME: + case OCI_ATTR_CORRELATION: + case OCI_ATTR_DEQCOND: + case OCI_ATTR_TRANSFORMATION: + if (cxBuffer_FromObject(&buffer, value, + self->environment->encoding) < 0) + return -1; + ociValue = (dvoid*) buffer.ptr; + valueLength = buffer.size; + break; + case OCI_ATTR_DEQ_MSGID: + if (PyBytes_AsStringAndSize(value, &rawValuePtr, + &rawValueLength) < 0) + return -1; + status = OCIRawAssignBytes(self->environment->handle, + self->environment->errorHandle, (const ub1*) rawValuePtr, + (ub4) rawValueLength, &rawValue); + if (Environment_CheckForError(self->environment, status, + "DeqOptions_SetOCIAttr(): assign raw value") < 0) + return -1; + ociValue = (dvoid*) rawValue; + break; + }; + status = OCIAttrSet(self->handle, OCI_DTYPE_AQDEQ_OPTIONS, + ociValue, valueLength, *attribute, self->environment->errorHandle); + cxBuffer_Clear(&buffer); + if (rawValue) + OCIRawResize(self->environment->handle, self->environment->errorHandle, + 0, &rawValue); + if (Environment_CheckForError(self->environment, status, + "DeqOptions_SetOCIAttr(): set value") < 0) + return -1; + return 0; +} + + +//----------------------------------------------------------------------------- +// MessageProperties_New() +// Create a new dequeue options object. +//----------------------------------------------------------------------------- +static udt_MessageProperties *MessageProperties_New( + udt_Environment *env) // environment in which to create +{ + udt_MessageProperties *self; + sword status; + + self = (udt_MessageProperties*) + g_MessagePropertiesType.tp_alloc(&g_MessagePropertiesType, 0); + if (!self) + return NULL; + Py_INCREF(env); + self->environment = env; + status = OCIDescriptorAlloc(env->handle, (dvoid**) &self->handle, + OCI_DTYPE_AQMSG_PROPERTIES, 0, 0); + if (Environment_CheckForError(env, status, + "MessageProperties_New()") < 0) { + Py_DECREF(self); + return NULL; + } + + return self; +} + + +//----------------------------------------------------------------------------- +// MessageProperties_Free() +// Free the memory associated with the message properties object. +//----------------------------------------------------------------------------- +static void MessageProperties_Free( + udt_MessageProperties *self) // object to free +{ + if (self->handle) + OCIDescriptorFree(self->handle, OCI_DTYPE_AQMSG_PROPERTIES); + Py_CLEAR(self->environment); + Py_TYPE(self)->tp_free((PyObject*) self); +} + + +//----------------------------------------------------------------------------- +// MessageProperties_GetOCIAttr() +// Get the value of the OCI attribute. +//----------------------------------------------------------------------------- +static PyObject *MessageProperties_GetOCIAttr( + udt_MessageProperties *self, // options object + ub4 *attribute) // OCI attribute type +{ + ub4 valueLength, ub4Value; + dvoid *ociValue = NULL; + char *rawValuePtr; + OCIDate dateValue; + OCIRaw *rawValue; + char *textValue; + sb4 sb4Value; + ub2 ub2Value; + sword status; + + // get the value from the OCI + switch (*attribute) { + case OCI_ATTR_MSG_DELIVERY_MODE: + ociValue = &ub2Value; + break; + case OCI_ATTR_ATTEMPTS: + case OCI_ATTR_DELAY: + case OCI_ATTR_EXPIRATION: + case OCI_ATTR_PRIORITY: + ociValue = &sb4Value; + break; + case OCI_ATTR_MSG_STATE: + ociValue = &ub4Value; + break; + case OCI_ATTR_CORRELATION: + case OCI_ATTR_EXCEPTION_QUEUE: + ociValue = &textValue; + break; + case OCI_ATTR_ENQ_TIME: + ociValue = &dateValue; + break; + case OCI_ATTR_ORIGINAL_MSGID: + rawValue = NULL; + ociValue = &rawValue; + break; + }; + status = OCIAttrGet(self->handle, OCI_DTYPE_AQMSG_PROPERTIES, ociValue, + &valueLength, *attribute, self->environment->errorHandle); + if (Environment_CheckForError(self->environment, status, + "MessageProperties_GetOCIAttr()") < 0) + return NULL; + if (ociValue == &textValue) { + if (!textValue) + Py_RETURN_NONE; + return cxString_FromEncodedString(textValue, valueLength, + self->environment->encoding); + } else if (ociValue == &rawValue) { + if (!rawValue) + Py_RETURN_NONE; + rawValuePtr = (char*) OCIRawPtr(self->environment->handle, rawValue); + valueLength = OCIRawSize(self->environment->handle, rawValue); + return PyBytes_FromStringAndSize(rawValuePtr, valueLength); + } else if (ociValue == &dateValue) + return OracleDateToPythonDate(&vt_DateTime, &dateValue); + else if (ociValue == &ub2Value) + return PyInt_FromLong(ub2Value); + else if (ociValue == &sb4Value) + return PyInt_FromLong(sb4Value); + return PyInt_FromLong(ub4Value); +} + + +//----------------------------------------------------------------------------- +// MessageProperties_SetOCIAttr() +// Set the value of the OCI attribute. +//----------------------------------------------------------------------------- +static int MessageProperties_SetOCIAttr( + udt_MessageProperties *self, // options object + PyObject *value, // value to set + ub4 *attribute) // OCI attribute type +{ + Py_ssize_t rawValueLength; + OCIRaw *rawValue = NULL; + dvoid *ociValue = NULL; + ub4 valueLength = 0; + udt_Buffer buffer; + char *rawValuePtr; + ub4 ub4Value; + ub2 ub2Value; + sword status; + + cxBuffer_Init(&buffer); + switch (*attribute) { + case OCI_ATTR_MSG_DELIVERY_MODE: + ub2Value = (ub2) PyInt_AsLong(value); + if (PyErr_Occurred()) + return -1; + ociValue = &ub2Value; + break; + case OCI_ATTR_DEQ_MODE: + case OCI_ATTR_NAVIGATION: + case OCI_ATTR_VISIBILITY: + case OCI_ATTR_WAIT: + ub4Value = (ub4) PyInt_AsLong(value); + if (PyErr_Occurred()) + return -1; + ociValue = &ub4Value; + break; + case OCI_ATTR_CONSUMER_NAME: + case OCI_ATTR_CORRELATION: + case OCI_ATTR_DEQCOND: + case OCI_ATTR_TRANSFORMATION: + if (cxBuffer_FromObject(&buffer, value, + self->environment->encoding) < 0) + return -1; + ociValue = (dvoid*) buffer.ptr; + valueLength = buffer.size; + break; + case OCI_ATTR_DEQ_MSGID: + if (PyBytes_AsStringAndSize(value, &rawValuePtr, + &rawValueLength) < 0) + return -1; + status = OCIRawAssignBytes(self->environment->handle, + self->environment->errorHandle, (const ub1*) rawValuePtr, + (ub4) rawValueLength, &rawValue); + if (Environment_CheckForError(self->environment, status, + "MessageProperties_SetOCIAttr(): assign raw value") < 0) + return -1; + ociValue = (dvoid*) rawValue; + break; + }; + status = OCIAttrSet(self->handle, OCI_DTYPE_AQMSG_PROPERTIES, + ociValue, valueLength, *attribute, self->environment->errorHandle); + cxBuffer_Clear(&buffer); + if (rawValue) + OCIRawResize(self->environment->handle, self->environment->errorHandle, + 0, &rawValue); + if (Environment_CheckForError(self->environment, status, + "MessageProperties_SetOCIAttr(): set value") < 0) + return -1; + return 0; +} + diff --git a/Connection.c b/Connection.c index 0adeb57..f5aa2ed 100644 --- a/Connection.c +++ b/Connection.c @@ -66,6 +66,11 @@ static PyObject *Connection_ContextManagerExit(udt_Connection*, PyObject*); static PyObject *Connection_ChangePasswordExternal(udt_Connection*, PyObject*); static PyObject *Connection_GetType(udt_Connection*, PyObject*); static PyObject *Connection_GetStmtCacheSize(udt_Connection*, void*); +static PyObject *Connection_NewEnqueueOptions(udt_Connection*, PyObject*); +static PyObject *Connection_NewDequeueOptions(udt_Connection*, PyObject*); +static PyObject *Connection_NewMessageProperties(udt_Connection*, PyObject*); +static PyObject *Connection_Dequeue(udt_Connection*, PyObject*, PyObject*); +static PyObject *Connection_Enqueue(udt_Connection*, PyObject*, PyObject*); static int Connection_SetStmtCacheSize(udt_Connection*, PyObject*, void*); #if ORACLE_VERSION_HEX >= ORACLE_VERSION(10, 2) static PyObject *Connection_GetOCIAttr(udt_Connection*, ub4*); @@ -114,6 +119,12 @@ static PyMethodDef g_ConnectionMethods[] = { { "changepassword", (PyCFunction) Connection_ChangePasswordExternal, METH_VARARGS }, { "gettype", (PyCFunction) Connection_GetType, METH_VARARGS }, + { "deqoptions", (PyCFunction) Connection_NewDequeueOptions, METH_NOARGS }, + { "enqoptions", (PyCFunction) Connection_NewEnqueueOptions, METH_NOARGS }, + { "msgproperties", (PyCFunction) Connection_NewMessageProperties, + METH_NOARGS }, + { "deq", (PyCFunction) Connection_Dequeue, METH_VARARGS | METH_KEYWORDS }, + { "enq", (PyCFunction) Connection_Enqueue, METH_VARARGS | METH_KEYWORDS }, { NULL } }; @@ -798,6 +809,7 @@ static int Connection_Connect( #if ORACLE_VERSION_HEX >= ORACLE_VERSION(10, 2) #include "Subscription.c" #endif +#include "AQ.c" //----------------------------------------------------------------------------- @@ -1557,6 +1569,155 @@ static PyObject *Connection_Cancel( } +//----------------------------------------------------------------------------- +// Connection_NewEnqueueOptions() +// Creates a new enqueue options object and returns it. +//----------------------------------------------------------------------------- +static PyObject *Connection_NewEnqueueOptions( + udt_Connection *self, // connection + PyObject *args) // none +{ + return (PyObject*) EnqOptions_New(self->environment); +} + + +//----------------------------------------------------------------------------- +// Connection_NewDequeueOptions() +// Creates a new dequeue options object and returns it. +//----------------------------------------------------------------------------- +static PyObject *Connection_NewDequeueOptions( + udt_Connection *self, // connection + PyObject *args) // none +{ + return (PyObject*) DeqOptions_New(self->environment); +} + + +//----------------------------------------------------------------------------- +// Connection_NewMessageProperties() +// Creates a new message properties object and returns it. +//----------------------------------------------------------------------------- +static PyObject *Connection_NewMessageProperties( + udt_Connection *self, // connection + PyObject *args) // none +{ + return (PyObject*) MessageProperties_New(self->environment); +} + + +//----------------------------------------------------------------------------- +// Connection_Dequeue() +// Dequeues a message using Advanced Queuing capabilities. The message ID is +// returned if a message is available or None if no message is available. +//----------------------------------------------------------------------------- +static PyObject *Connection_Dequeue( + udt_Connection *self, // connection + PyObject* args, // arguments + PyObject* keywordArgs) // keyword arguments +{ + static char *keywordList[] = { "name", "options", "msgproperties", + "payload", NULL }; + PyObject *nameObj, *excType, *excValue, *traceback; + udt_MessageProperties *propertiesObj; + udt_DeqOptions *optionsObj; + udt_Object *payloadObj; + udt_Buffer nameBuffer; + char *messageIdValue; + OCIRaw *messageId; + ub4 messageIdSize; + udt_Error *error; + sword status; + + // parse arguments + if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "OO!O!O!", keywordList, + &nameObj, &g_DeqOptionsType, &optionsObj, &g_MessagePropertiesType, + &propertiesObj, &g_ObjectType, &payloadObj)) + return NULL; + if (cxBuffer_FromObject(&nameBuffer, nameObj, + self->environment->encoding) < 0) + return NULL; + + // enqueue payload + messageId = NULL; + status = OCIAQDeq(self->handle, self->environment->errorHandle, + (oratext*) nameBuffer.ptr, optionsObj->handle, + propertiesObj->handle, payloadObj->objectType->tdo, + &payloadObj->instance, &payloadObj->indicator, &messageId, + OCI_DEFAULT); + cxBuffer_Clear(&nameBuffer); + if (Environment_CheckForError(self->environment, status, + "Connection_Dequeue()") < 0) { + PyErr_Fetch(&excType, &excValue, &traceback); + if (excValue) { + error = (udt_Error*) excValue; + if (error->code == 25228) { + Py_XDECREF(excType); + Py_XDECREF(excValue); + Py_XDECREF(traceback); + Py_RETURN_NONE; + } + } + PyErr_Restore(excType, excValue, traceback); + return NULL; + } + + // determine the message id + messageIdValue = (char*) OCIRawPtr(self->environment->handle, messageId); + messageIdSize = OCIRawSize(self->environment->handle, messageId); + return PyBytes_FromStringAndSize(messageIdValue, messageIdSize); +} + + +//----------------------------------------------------------------------------- +// Connection_Enqueue() +// Enqueues a message using Advanced Queuing capabilities. The message ID is +// returned. +//----------------------------------------------------------------------------- +static PyObject *Connection_Enqueue( + udt_Connection *self, // connection + PyObject* args, // arguments + PyObject* keywordArgs) // keyword arguments +{ + static char *keywordList[] = { "name", "options", "msgproperties", + "payload", NULL }; + udt_MessageProperties *propertiesObj; + udt_EnqOptions *optionsObj; + udt_Object *payloadObj; + udt_Buffer nameBuffer; + char *messageIdValue; + PyObject *nameObj; + OCIRaw *messageId; + ub4 messageIdSize; + sword status; + + // parse arguments + if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "OO!O!O!", keywordList, + &nameObj, &g_EnqOptionsType, &optionsObj, &g_MessagePropertiesType, + &propertiesObj, &g_ObjectType, &payloadObj)) + return NULL; + if (cxBuffer_FromObject(&nameBuffer, nameObj, + self->environment->encoding) < 0) + return NULL; + + // enqueue payload + messageId = NULL; + status = OCIAQEnq(self->handle, self->environment->errorHandle, + (oratext*) nameBuffer.ptr, optionsObj->handle, + propertiesObj->handle, payloadObj->objectType->tdo, + &payloadObj->instance, &payloadObj->indicator, &messageId, + OCI_DEFAULT); + cxBuffer_Clear(&nameBuffer); + if (Environment_CheckForError(self->environment, status, + "Connection_Enqueue()") < 0) + return NULL; + + // determine the message id + messageIdValue = (char*) OCIRawPtr(self->environment->handle, messageId); + messageIdSize = OCIRawSize(self->environment->handle, messageId); + return PyBytes_FromStringAndSize(messageIdValue, messageIdSize); +} + + //----------------------------------------------------------------------------- // Connection_RegisterCallback() // Register a callback for the OCI function. diff --git a/cx_Oracle.c b/cx_Oracle.c index 4005488..12950c6 100644 --- a/cx_Oracle.c +++ b/cx_Oracle.c @@ -358,6 +358,9 @@ static PyObject *Module_Initialize(void) MAKE_TYPE_READY(&g_ObjectAttributeType); MAKE_TYPE_READY(&g_ExternalLobVarType); MAKE_TYPE_READY(&g_ObjectType); + MAKE_TYPE_READY(&g_EnqOptionsType); + MAKE_TYPE_READY(&g_DeqOptionsType); + MAKE_TYPE_READY(&g_MessagePropertiesType); #if ORACLE_VERSION_HEX >= ORACLE_VERSION(10, 2) MAKE_TYPE_READY(&g_SubscriptionType); MAKE_TYPE_READY(&g_MessageType); @@ -442,6 +445,10 @@ static PyObject *Module_Initialize(void) ADD_TYPE_OBJECT("SessionPool", &g_SessionPoolType) ADD_TYPE_OBJECT("_Error", &g_ErrorType) ADD_TYPE_OBJECT("Object", &g_ObjectType) + ADD_TYPE_OBJECT("ObjectType", &g_ObjectTypeType) + ADD_TYPE_OBJECT("EnqOptions", &g_EnqOptionsType) + ADD_TYPE_OBJECT("DeqOptions", &g_DeqOptionsType) + ADD_TYPE_OBJECT("MessageProperties", &g_MessagePropertiesType) // the name "connect" is required by the DB API ADD_TYPE_OBJECT("connect", &g_ConnectionType) @@ -556,6 +563,30 @@ static PyObject *Module_Initialize(void) ADD_OCI_CONSTANT(SUBSCR_QOS_HAREG) #endif + // add constants for advanced queueing + ADD_OCI_CONSTANT(DEQ_BROWSE) + ADD_OCI_CONSTANT(DEQ_FIRST_MSG) + ADD_OCI_CONSTANT(DEQ_IMMEDIATE) + ADD_OCI_CONSTANT(DEQ_LOCKED) + ADD_OCI_CONSTANT(DEQ_NEXT_MSG) + ADD_OCI_CONSTANT(DEQ_NEXT_TRANSACTION) + ADD_OCI_CONSTANT(DEQ_NO_WAIT) + ADD_OCI_CONSTANT(DEQ_ON_COMMIT) + ADD_OCI_CONSTANT(DEQ_REMOVE) + ADD_OCI_CONSTANT(DEQ_REMOVE_NODATA) + ADD_OCI_CONSTANT(DEQ_WAIT_FOREVER) + ADD_OCI_CONSTANT(ENQ_IMMEDIATE) + ADD_OCI_CONSTANT(ENQ_ON_COMMIT) + ADD_OCI_CONSTANT(MSG_NO_DELAY) + ADD_OCI_CONSTANT(MSG_NO_EXPIRATION) + ADD_OCI_CONSTANT(MSG_PERSISTENT) + ADD_OCI_CONSTANT(MSG_BUFFERED) + ADD_OCI_CONSTANT(MSG_EXPIRED) + ADD_OCI_CONSTANT(MSG_PERSISTENT_OR_BUFFERED) + ADD_OCI_CONSTANT(MSG_READY) + ADD_OCI_CONSTANT(MSG_PROCESSED) + ADD_OCI_CONSTANT(MSG_WAITING) + return module; } diff --git a/doc/aq.rst b/doc/aq.rst new file mode 100644 index 0000000..91465fb --- /dev/null +++ b/doc/aq.rst @@ -0,0 +1,218 @@ +.. _aq: + +**************** +Advanced Queuing +**************** + +.. _deqoptions: + +--------------- +Dequeue Options +--------------- + +.. note:: + + This object is an extension to the DB API. It is returned by the + :meth:`~Connection.deqoptions()` call and is used in calls to + :meth:`~Connection.deq()`. + + +.. attribute:: DeqOptions.condition + + This attribute specifies a boolean expression similar to the where clause + of a SQL query. The boolean expression can include conditions on message + properties, user data properties and PL/SQL or SQL functions. + + +.. attribute:: DeqOptions.consumername + + This attribute specifies the name of the consumer. Only messages matching + the consumer name will be accessed. If the queue is not set up for multiple + consumers this attribute should not be set. + + +.. attribute:: DeqOptions.correlation + + This attribute specifies the correlation identifier of the message to be + dequeued. Special pattern-matching characters, such as the percent sign (%) + and the underscore (_), can be used. If multiple messages satisfy the + pattern, the order of dequeuing is indeterminate. + + +.. attribute:: DeqOptions.deliverymode + + This write-only attribute specifies whether persistent + (:data:`cx_Oracle.MSG_PERSISTENT`), buffered + (:data:`cx_Oracle.MSG_BUFFERED`) or both + (:data:`cx_Oracle.MSG_PERSISTENT_OR_BUFFERED`) types of messages should + be dequeued. + + +.. attribute:: DeqOptions.mode + + This attribute specifies the locking behaviour associated with the dequeue + operation. The valid values are :data:`cx_Oracle.DEQ_BROWSE` (the message is + read without acquiring any lock on the message), + :data:`cx_Oracle.DEQ_LOCKED` (the message is read and a write lock is + obtained), :data:`cx_Oracle.DEQ_REMOVE` (the message is read and updated + or deleted, which is also the default value) or + :data:`cx_Oracle.DEQ_REMOVE_NODATA` (which confirms receipt of the message + but does not deliver the actual message content). + + +.. attribute:: DeqOptions.msgid + + This attribute specifies the identifier of the message to be dequeued. + + +.. attribute:: DeqOptions.navigation + + This attribute specifies the position of the message that is retrieved. + The valid values are :data:`cx_Oracle.DEQ_FIRST_MSG` (retrieves the first + available message that matches the search criteria), + :data:`cx_Oracle.DEQ_NEXT_MSG` (retrieves the next available message that + matches the search criteria, which is also the default value), or + :data:`cx_Oracle.DEQ_NEXT_TRANSACTION` (skips the remainder of the current + transaction group and retrieves the first message of the next transaction + group). + + +.. attribute:: DeqOptions.transformation + + This attribute specifies the name of the transformation that must be applied + after the message is enqueued from the database but before it is returned to + the calling application. The transformation must be created using + dbms_transform. + + +.. attribute:: DeqOptions.visibility + + This attribute specifies the transactional behavior of the enqueue request. + It must be one of :data:`cx_Oracle.ENQ_ON_COMMIT` indicating that the + enqueue is part of the current transaction (the default) or + :data:`cx_Oracle.ENQ_IMMEDIATE` indicating that the enqueue operation + constitutes a transaction of its own. This parameter is ignored when using + the DEQ_BROWSE mode. + + +.. _enqoptions: + +--------------- +Enqueue Options +--------------- + +.. note:: + + This object is an extension to the DB API. It is returned by the + :meth:`~Connection.enqoptions()` call and is used in calls to + :meth:`~Connection.enq()`. + + +.. attribute:: EnqOptions.deliverymode + + This write-only attribute specifies whether persistent + (:data:`cx_Oracle.MSG_PERSISTENT`) or buffered + (:data:`cx_Oracle.MSG_BUFFERED`) messages should be enqueued. + + +.. attribute:: EnqOptions.transformation + + This attribute specifies the name of the transformation that must be applied + before the message is enqueued into the database. The transformation must + be created using dbms_transform. + + +.. attribute:: EnqOptions.visibility + + This attribute specifies the transactional behavior of the enqueue request. + It must be one of :data:`cx_Oracle.ENQ_ON_COMMIT` indicating that the + enqueue is part of the current transaction (the default) or + :data:`cx_Oracle.ENQ_IMMEDIATE` indicating that the enqueue operation + constitutes a transaction of its own. + + +.. _msgproperties: + +------------------ +Message Properties +------------------ + +.. note:: + + This object is an extension to the DB API. It is returned by the + :meth:`~Connection.msgproperties()` call and is used in calls to + :meth:`~Connection.deq` and :meth:`~Connection.enq()`. + + +.. attribute:: MessageProperties.attempts + + This read-only attribute specifies the number of attempts that have been + made to dequeue the message. + + +.. attribute:: MessageProperties.correlation + + This attribute specifies the correlation used when the message was enqueued. + + +.. attribute:: MessageProperties.delay + + This attribute specifies the number of seconds to delay an enqueued message. + Any integer is acceptable but the constant :data:`cx_Oracle.MSG_NO_DELAY` + can also be used indicating that the message is available for immediate + dequeuing. + + +.. attribute:: EnqOptions.deliverymode + + This read-only attribute specifies whether a persistent + (:data:`cx_Oracle.MSG_PERSISTENT`) or buffered + (:data:`cx_Oracle.MSG_BUFFERED`) message was dequeued. + + +.. attribute:: EnqOptions.enqtime + + This read-only attribute specifies the time that the message was enqueued. + + +.. attribute:: EnqOptions.exceptionq + + This attribute specifies the name of the queue to which the message is + moved if it cannot be processed successfully. Messages are moved if the + number of unsuccessful dequeue attempts has exceeded the maximum number of + retries or if the message has expired. All messages in the exception queue + are in the EXPIRED state. The default value is the name of the exception + queue associated with the queue table. + + +.. attribute:: EnqOptions.expiration + + This attribute specifies, in seconds, how long the message is available for + dequeuing. This attribute is an offset from the delay attribute. Expiration + processing requires the queue monitor to be running. Any integer is accepted + but the constant :data:`cx_Oracle.MSG_NO_EXPIRATION` can also be used + indicating that the message never expires. + + +.. attribute:: EnqOptions.msgid + + This attribute specifies the id of the message in the last queue that + generated this message. + + +.. attribute:: EnqOptions.priority + + This attribute specifies the priority of the message. A smaller number + indicates a higher priority. The priority can be any integer, including + negative numbers. The default value is zero. + + +.. attribute:: EnqOptions.state + + This read-only attribute specifies the state of the message at the time of + the dequeue. It will be one of :data:`cx_Oracle.MSG_WAITING` (the message + delay has not yet been reached), :data:`cx_Oracle.MSG_READY` (the message + is ready to be processed), :data:`cx_Oracle.MSG_PROCESSED` (the message + has been processed and is retained) or :data:`cx_Oracle.MSG_EXPIRED` + (the message has been moved to the exception queue). + diff --git a/doc/connection.rst b/doc/connection.rst index 248cc4b..1b203b7 100644 --- a/doc/connection.rst +++ b/doc/connection.rst @@ -135,6 +135,33 @@ Connection Object Return a new Cursor object (:ref:`cursorobj`) using the connection. +.. method:: Connection.deq(name, options, msgproperties, payload) + + Returns a message id after successfully dequeuing a message. The options + object can be created using :meth:`~Connection.deqoptions()` and the + msgproperties object can be created using + :meth:`~Connection.msgproperties()`. The payload must be an object created + using :meth:`~ObjectType.newobject()`. + + .. versionadded:: development + + .. note:: + + This method is an extension to the DB API definition. + + +.. method:: Connection.deqoptions() + + Returns an object specifying the options to use when dequeuing messages. + See :ref:`deqoptions` for more information. + + .. versionadded:: development + + .. note:: + + This method is an extension to the DB API definition. + + .. attribute:: Connection.dsn This read-only attribute returns the TNS entry of the database to which a @@ -169,6 +196,33 @@ Connection Object available in Python 2.x when not built in unicode mode. +.. method:: Connection.enq(name, options, msgproperties, payload) + + Returns a message id after successfully enqueuing a message. The options + object can be created using :meth:`~Connection.enqoptions()` and the + msgproperties object can be created using + :meth:`~Connection.msgproperties()`. The payload must be an object created + using :meth:`~ObjectType.newobject()`. + + .. versionadded:: development + + .. note:: + + This method is an extension to the DB API definition. + + +.. method:: Connection.enqoptions() + + Returns an object specifying the options to use when enqueuing messages. + See :ref:`enqoptions` for more information. + + .. versionadded:: development + + .. note:: + + This method is an extension to the DB API definition. + + .. method:: Connection.gettype(name) Return a type object (:ref:`objecttype`) given its name. This can then be @@ -233,6 +287,18 @@ Connection Object This attribute is an extension to the DB API definition. +.. method:: Connection.msgproperties() + + Returns an object specifying the properties of messages used in advanced + queuing. See :ref:`msgproperties` for more information. + + .. versionadded:: development + + .. note:: + + This method is an extension to the DB API definition. + + .. attribute:: Connection.nencoding This read-only attribute returns the IANA character set name of the national diff --git a/doc/index.rst b/doc/index.rst index d724d0c..bc313b3 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -22,6 +22,7 @@ Contents: subscription.rst lob.rst objecttype.rst + aq.rst releasenotes.rst license.rst diff --git a/doc/module.rst b/doc/module.rst index e323f64..8ad7815 100644 --- a/doc/module.rst +++ b/doc/module.rst @@ -249,6 +249,167 @@ Global This attribute is an extension to the DB API definition. +Advanced Queuing: Common +------------------------ + +.. note:: + + These constants are extensions to the DB API definition. + + +.. data:: MSG_BUFFERED + + This constant is used to specify that enqueue/dequeue operations should + enqueue or dequeue buffered messages. + + +.. data:: MSG_EXPIRED + + This constant is used to specify that the message has been moved to the + exception queue. + + +.. data:: MSG_NO_DELAY + + This constant is used to specify that the message is available for + immediate dequeuing. + + +.. data:: MSG_NO_EXPIRATION + + This constant is used to specify that the message never expires. + + +.. data:: MSG_PERSISTENT + + This constant is used to specify that enqueue/dequeue operations should + enqueue or dequeue persistent messages (the default). + + +.. data:: MSG_PROCESSED + + This constant is used to specify that the message has been processed and + is retained. + + +.. data:: MSG_READY + + This constant is used to specify that the message is ready to be processed. + + +.. data:: MSG_WAITING + + This constant is used to specify that the message delay has not yet been + reached. + + +Advanced Queuing: Dequeue +------------------------- + +.. note:: + + These constants are extensions to the DB API definition. + + +.. data:: DEQ_BROWSE + + This constant is used to specify that dequeue should read the message + without acquiring any lock on the message (eqivalent to a select statement). + + +.. data:: DEQ_FIRST_MSG + + This constant is used to specify that dequeue should retrieve the first + available message that matches the search criteria. This resets the position + to the beginning of the queue. + + +.. data:: DEQ_IMMEDIATE + + This constant is used to specify that dequeue should perform its work as + part of an independent transaction. + + +.. data:: DEQ_LOCKED + + This constant is used to specify that dequeue should read and obtain a + write lock on the message for the duration of the transaction (equivalent to + a select for update statement). + + +.. data:: DEQ_NEXT_MSG + + This constant is used to specify that dequeue should retrieve the next + available message that matches the search criteria. If the previous message + belongs to a message group, AQ retrieves the next available message that + matches the search criteria and belongs to the message group. This is the + default. + + +.. data:: DEQ_NEXT_TRANSACTION + + This constant is used to specify that dequeue should skip the remainder of + the transaction group and retrieve the first message of the next transaction + group. This option can only be used if message grouping is enabled for the + current queue. + + +.. data:: DEQ_NO_WAIT + + This constant is used to specify that dequeue not wait for messages to be + available for dequeuing. + + +.. data:: DEQ_ON_COMMIT + + This constant is used to specify that dequeue should be part of the current + transaction (the default). + + +.. data:: DEQ_REMOVE + + This constant is used to specify that dequeue should read the message and + update or delete it (the default mode). + + +.. data:: DEQ_REMOVE_NODATA + + This constant is used to specify that dequeue should confirm receipt of the + message but not deliver the actual message content. + + +.. data:: DEQ_WAIT_FOREVER + + This constant is used to specify that dequeue should wait forever for + messages to be available for dequeuing (the default wait mode). + + +.. data:: MSG_PERSISTENT_OR_BUFFERED + + This constant is used to specify that dequeue should dequeue either + persistent or buffered messages. + + +Advanced Queuing: Enqueue +------------------------- + +.. note:: + + These constants are extensions to the DB API definition. + + +.. data:: ENQ_IMMEDIATE + + This constant is used to specify that enqueue should perform its work as + part of an independent transaction. + + +.. data:: ENQ_ON_COMMIT + + This constant is used to specify that enqueue should be part of the current + transaction (the default). + + Database Callbacks ------------------ diff --git a/samples/AdvancedQueuing.py b/samples/AdvancedQueuing.py index a2ce6dd..5861f8a 100644 --- a/samples/AdvancedQueuing.py +++ b/samples/AdvancedQueuing.py @@ -64,54 +64,17 @@ book2 = booksType.newobject() book2.TITLE = "Harry Potter and the Philosopher's Stone" book2.AUTHORS = "Rowling, J.K." book2.PRICE = decimal.Decimal("7.99") +options = connection.enqoptions() +messageProperties = connection.msgproperties() for book in (book1, book2): print("Enqueuing book", book.TITLE) - cursor.execute(""" - declare - t_MessageId raw(16); - t_Options dbms_aq.enqueue_options_t; - t_MessageProperties dbms_aq.message_properties_t; - begin - dbms_aq.enqueue( - queue_name => :queueName, - enqueue_options => t_Options, - message_properties => t_MessageProperties, - payload => :book, - msgid => t_MessageId); - end;""", - queueName = QUEUE_NAME, - book = book) + connection.enq(QUEUE_NAME, options, messageProperties, book) connection.commit() # dequeue the messages -var = cursor.var(cx_Oracle.OBJECT, typename = BOOK_TYPE_NAME) -while True: - cursor.execute(""" - declare - t_MessageId raw(16); - t_Options dbms_aq.dequeue_options_t; - t_MessageProperties dbms_aq.message_properties_t; - no_messages exception; - pragma exception_init(no_messages, -25228); - begin - t_Options.navigation := dbms_aq.FIRST_MESSAGE; - t_Options.wait := dbms_aq.NO_WAIT; - begin - dbms_aq.dequeue( - queue_name => :queueName, - dequeue_options => t_Options, - message_properties => t_MessageProperties, - payload => :book, - msgid => t_MessageId); - exception - when no_messages then - :book := null; - end; - end;""", - queueName = QUEUE_NAME, - book = var) - book = var.getvalue() - if book is None: - break +options = connection.deqoptions() +options.navigation = cx_Oracle.DEQ_FIRST_MSG +options.wait = cx_Oracle.DEQ_NO_WAIT +while connection.deq(QUEUE_NAME, options, messageProperties, book): print("Dequeued book", book.TITLE) diff --git a/setup.py b/setup.py index 14976d3..8aad1ce 100644 --- a/setup.py +++ b/setup.py @@ -362,7 +362,7 @@ extension = Extension( extra_compile_args = extraCompileArgs, extra_link_args = extraLinkArgs, sources = ["cx_Oracle.c"], - depends = ["Buffer.c", "Callback.c", "Connection.c", "Cursor.c", + depends = ["AQ.c", "Buffer.c", "Callback.c", "Connection.c", "Cursor.c", "CursorVar.c", "DateTimeVar.c", "Environment.c", "Error.c", "ExternalLobVar.c", "IntervalVar.c", "LobVar.c", "LongVar.c", "NumberVar.c", "Object.c", "ObjectType.c", "ObjectVar.c",