Added support for using the C API for advanced queuing.

This commit is contained in:
Anthony Tuininga 2016-03-01 15:34:05 -07:00
parent 9c8cc1160b
commit 0f3b0f17c0
9 changed files with 1406 additions and 45 deletions

760
AQ.c Normal file
View File

@ -0,0 +1,760 @@
//-----------------------------------------------------------------------------
// AQ.c
// Implements the enqueue and dequeue options and message properties objects
// used in Advanced Queuing.
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
// structures used for handling AQ options and message properties
//-----------------------------------------------------------------------------
typedef struct {
PyObject_HEAD
udt_Environment *environment;
OCIAQEnqOptions *handle;
} udt_EnqOptions;
typedef struct {
PyObject_HEAD
udt_Environment *environment;
OCIAQDeqOptions *handle;
} udt_DeqOptions;
typedef struct {
PyObject_HEAD
udt_Environment *environment;
OCIAQMsgProperties *handle;
} udt_MessageProperties;
//-----------------------------------------------------------------------------
// Declaration of methods used for enqueue options
//-----------------------------------------------------------------------------
static udt_EnqOptions *EnqOptions_New(udt_Environment*);
static void EnqOptions_Free(udt_EnqOptions*);
static PyObject *EnqOptions_GetOCIAttr(udt_EnqOptions*, ub4*);
static int EnqOptions_SetOCIAttr(udt_EnqOptions*, PyObject*, ub4*);
//-----------------------------------------------------------------------------
// Declaration of methods used for dequeue options
//-----------------------------------------------------------------------------
static udt_DeqOptions *DeqOptions_New(udt_Environment*);
static void DeqOptions_Free(udt_DeqOptions*);
static PyObject *DeqOptions_GetOCIAttr(udt_DeqOptions*, ub4*);
static int DeqOptions_SetOCIAttr(udt_DeqOptions*, PyObject*, ub4*);
//-----------------------------------------------------------------------------
// Declaration of methods used for message properties
//-----------------------------------------------------------------------------
static udt_MessageProperties *MessageProperties_New(udt_Environment*);
static void MessageProperties_Free(udt_MessageProperties*);
static PyObject *MessageProperties_GetOCIAttr(udt_MessageProperties*, ub4*);
static int MessageProperties_SetOCIAttr(udt_MessageProperties*, PyObject*,
ub4*);
//-----------------------------------------------------------------------------
// constants for OCI attributes
//-----------------------------------------------------------------------------
static ub4 gc_AQAttempts = OCI_ATTR_ATTEMPTS;
static ub4 gc_AQConsumerName = OCI_ATTR_CONSUMER_NAME;
static ub4 gc_AQCorrelation = OCI_ATTR_CORRELATION;
static ub4 gc_AQDelay = OCI_ATTR_DELAY;
static ub4 gc_AQDeliveryMode = OCI_ATTR_MSG_DELIVERY_MODE;
static ub4 gc_AQDeqCondition = OCI_ATTR_DEQCOND;
static ub4 gc_AQDeqMode = OCI_ATTR_DEQ_MODE;
static ub4 gc_AQDeqMsgId = OCI_ATTR_DEQ_MSGID;
static ub4 gc_AQEnqTime = OCI_ATTR_ENQ_TIME;
static ub4 gc_AQExceptionQ = OCI_ATTR_EXCEPTION_QUEUE;
static ub4 gc_AQExpiration = OCI_ATTR_EXPIRATION;
static ub4 gc_AQNavigation = OCI_ATTR_NAVIGATION;
static ub4 gc_AQOriginalMsgId = OCI_ATTR_ORIGINAL_MSGID;
static ub4 gc_AQPriority = OCI_ATTR_PRIORITY;
static ub4 gc_AQState = OCI_ATTR_MSG_STATE;
static ub4 gc_AQTransformation = OCI_ATTR_TRANSFORMATION;
static ub4 gc_AQVisibility = OCI_ATTR_VISIBILITY;
static ub4 gc_AQWait = OCI_ATTR_WAIT;
//-----------------------------------------------------------------------------
// declaration of calculated members for Python type "EnqOptions"
//-----------------------------------------------------------------------------
static PyGetSetDef g_EnqOptionsCalcMembers[] = {
{ "deliverymode", 0, (setter) EnqOptions_SetOCIAttr, 0,
&gc_AQDeliveryMode },
{ "transformation", (getter) EnqOptions_GetOCIAttr,
(setter) EnqOptions_SetOCIAttr, 0, &gc_AQTransformation },
{ "visibility", (getter) EnqOptions_GetOCIAttr,
(setter) EnqOptions_SetOCIAttr, 0, &gc_AQVisibility },
{ NULL }
};
//-----------------------------------------------------------------------------
// declaration of calculated members for Python type "DeqOptions"
//-----------------------------------------------------------------------------
static PyGetSetDef g_DeqOptionsCalcMembers[] = {
{ "condition", (getter) DeqOptions_GetOCIAttr,
(setter) DeqOptions_SetOCIAttr, 0, &gc_AQDeqCondition },
{ "consumername", (getter) DeqOptions_GetOCIAttr,
(setter) DeqOptions_SetOCIAttr, 0, &gc_AQConsumerName },
{ "correlation", (getter) DeqOptions_GetOCIAttr,
(setter) DeqOptions_SetOCIAttr, 0, &gc_AQCorrelation },
{ "deliverymode", 0, (setter) EnqOptions_SetOCIAttr, 0,
&gc_AQDeliveryMode },
{ "mode", (getter) DeqOptions_GetOCIAttr,
(setter) DeqOptions_SetOCIAttr, 0, &gc_AQDeqMode },
{ "msgid", (getter) DeqOptions_GetOCIAttr,
(setter) DeqOptions_SetOCIAttr, 0, &gc_AQDeqMsgId },
{ "navigation", (getter) DeqOptions_GetOCIAttr,
(setter) DeqOptions_SetOCIAttr, 0, &gc_AQNavigation },
{ "transformation", (getter) EnqOptions_GetOCIAttr,
(setter) EnqOptions_SetOCIAttr, 0, &gc_AQTransformation },
{ "visibility", (getter) DeqOptions_GetOCIAttr,
(setter) DeqOptions_SetOCIAttr, 0, &gc_AQVisibility },
{ "wait", (getter) DeqOptions_GetOCIAttr,
(setter) DeqOptions_SetOCIAttr, 0, &gc_AQWait },
{ NULL }
};
//-----------------------------------------------------------------------------
// declaration of calculated members for Python type "MessageProperties"
//-----------------------------------------------------------------------------
static PyGetSetDef g_MessagePropertiesCalcMembers[] = {
{ "attempts", (getter) MessageProperties_GetOCIAttr, 0, 0,
&gc_AQAttempts },
{ "correlation", (getter) MessageProperties_GetOCIAttr,
(setter) MessageProperties_SetOCIAttr, 0, &gc_AQCorrelation },
{ "delay", (getter) MessageProperties_GetOCIAttr,
(setter) MessageProperties_SetOCIAttr, 0, &gc_AQDelay },
{ "deliverymode", (getter) MessageProperties_GetOCIAttr, 0, 0,
&gc_AQDeliveryMode },
{ "enqtime", (getter) MessageProperties_GetOCIAttr, 0, 0, &gc_AQEnqTime },
{ "exceptionq", (getter) MessageProperties_GetOCIAttr,
(setter) MessageProperties_SetOCIAttr, 0, &gc_AQExceptionQ },
{ "expiration", (getter) MessageProperties_GetOCIAttr,
(setter) MessageProperties_SetOCIAttr, 0, &gc_AQExpiration },
{ "msgid", (getter) MessageProperties_GetOCIAttr,
(setter) MessageProperties_SetOCIAttr, 0, &gc_AQOriginalMsgId },
{ "priority", (getter) MessageProperties_GetOCIAttr,
(setter) MessageProperties_SetOCIAttr, 0, &gc_AQPriority },
{ "state", (getter) MessageProperties_GetOCIAttr, 0, 0, &gc_AQState },
{ NULL }
};
//-----------------------------------------------------------------------------
// Python type declarations
//-----------------------------------------------------------------------------
static PyTypeObject g_EnqOptionsType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cx_Oracle.EnqOptions", // tp_name
sizeof(udt_EnqOptions), // tp_basicsize
0, // tp_itemsize
(destructor) EnqOptions_Free, // tp_dealloc
0, // tp_print
0, // tp_getattr
0, // tp_setattr
0, // tp_compare
0, // tp_repr
0, // tp_as_number
0, // tp_as_sequence
0, // tp_as_mapping
0, // tp_hash
0, // tp_call
0, // tp_str
0, // tp_getattro
0, // tp_setattro
0, // tp_as_buffer
Py_TPFLAGS_DEFAULT, // tp_flags
0, // tp_doc
0, // tp_traverse
0, // tp_clear
0, // tp_richcompare
0, // tp_weaklistoffset
0, // tp_iter
0, // tp_iternext
0, // tp_methods
0, // tp_members
g_EnqOptionsCalcMembers, // tp_getset
0, // tp_base
0, // tp_dict
0, // tp_descr_get
0, // tp_descr_set
0, // tp_dictoffset
0, // tp_init
0, // tp_alloc
0, // tp_new
0, // tp_free
0, // tp_is_gc
0 // tp_bases
};
static PyTypeObject g_DeqOptionsType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cx_Oracle.DeqOptions", // tp_name
sizeof(udt_DeqOptions), // tp_basicsize
0, // tp_itemsize
(destructor) DeqOptions_Free, // tp_dealloc
0, // tp_print
0, // tp_getattr
0, // tp_setattr
0, // tp_compare
0, // tp_repr
0, // tp_as_number
0, // tp_as_sequence
0, // tp_as_mapping
0, // tp_hash
0, // tp_call
0, // tp_str
0, // tp_getattro
0, // tp_setattro
0, // tp_as_buffer
Py_TPFLAGS_DEFAULT, // tp_flags
0, // tp_doc
0, // tp_traverse
0, // tp_clear
0, // tp_richcompare
0, // tp_weaklistoffset
0, // tp_iter
0, // tp_iternext
0, // tp_methods
0, // tp_members
g_DeqOptionsCalcMembers, // tp_getset
0, // tp_base
0, // tp_dict
0, // tp_descr_get
0, // tp_descr_set
0, // tp_dictoffset
0, // tp_init
0, // tp_alloc
0, // tp_new
0, // tp_free
0, // tp_is_gc
0 // tp_bases
};
static PyTypeObject g_MessagePropertiesType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cx_Oracle.MessageProperties", // tp_name
sizeof(udt_MessageProperties), // tp_basicsize
0, // tp_itemsize
(destructor) MessageProperties_Free,// tp_dealloc
0, // tp_print
0, // tp_getattr
0, // tp_setattr
0, // tp_compare
0, // tp_repr
0, // tp_as_number
0, // tp_as_sequence
0, // tp_as_mapping
0, // tp_hash
0, // tp_call
0, // tp_str
0, // tp_getattro
0, // tp_setattro
0, // tp_as_buffer
Py_TPFLAGS_DEFAULT, // tp_flags
0, // tp_doc
0, // tp_traverse
0, // tp_clear
0, // tp_richcompare
0, // tp_weaklistoffset
0, // tp_iter
0, // tp_iternext
0, // tp_methods
0, // tp_members
g_MessagePropertiesCalcMembers, // tp_getset
0, // tp_base
0, // tp_dict
0, // tp_descr_get
0, // tp_descr_set
0, // tp_dictoffset
0, // tp_init
0, // tp_alloc
0, // tp_new
0, // tp_free
0, // tp_is_gc
0 // tp_bases
};
//-----------------------------------------------------------------------------
// EnqOptions_New()
// Create a new enqueue options object.
//-----------------------------------------------------------------------------
static udt_EnqOptions *EnqOptions_New(
udt_Environment *env) // environment in which to create
{
udt_EnqOptions *self;
sword status;
self = (udt_EnqOptions*) g_EnqOptionsType.tp_alloc(&g_EnqOptionsType, 0);
if (!self)
return NULL;
Py_INCREF(env);
self->environment = env;
status = OCIDescriptorAlloc(env->handle, (dvoid**) &self->handle,
OCI_DTYPE_AQENQ_OPTIONS, 0, 0);
if (Environment_CheckForError(env, status, "EnqOptions_New()") < 0) {
Py_DECREF(self);
return NULL;
}
return self;
}
//-----------------------------------------------------------------------------
// EnqOptions_Free()
// Free the memory associated with the enqueue options object.
//-----------------------------------------------------------------------------
static void EnqOptions_Free(
udt_EnqOptions *self) // object to free
{
if (self->handle)
OCIDescriptorFree(self->handle, OCI_DTYPE_AQENQ_OPTIONS);
Py_CLEAR(self->environment);
Py_TYPE(self)->tp_free((PyObject*) self);
}
//-----------------------------------------------------------------------------
// EnqOptions_GetOCIAttr()
// Get the value of the OCI attribute.
//-----------------------------------------------------------------------------
static PyObject *EnqOptions_GetOCIAttr(
udt_EnqOptions *self, // options object
ub4 *attribute) // OCI attribute type
{
ub4 valueLength, ub4Value;
dvoid *ociValue = NULL;
char *textValue;
sword status;
// get the value from the OCI
switch (*attribute) {
case OCI_ATTR_VISIBILITY:
ociValue = &ub4Value;
break;
case OCI_ATTR_TRANSFORMATION:
ociValue = &textValue;
break;
};
status = OCIAttrGet(self->handle, OCI_DTYPE_AQENQ_OPTIONS, ociValue,
&valueLength, *attribute, self->environment->errorHandle);
if (Environment_CheckForError(self->environment, status,
"EnqOptions_GetOCIAttr()") < 0)
return NULL;
if (*attribute == gc_AQTransformation) {
if (!textValue)
Py_RETURN_NONE;
return cxString_FromEncodedString(textValue, valueLength,
self->environment->encoding);
}
return PyInt_FromLong(ub4Value);
}
//-----------------------------------------------------------------------------
// EnqOptions_SetOCIAttr()
// Set the value of the OCI attribute.
//-----------------------------------------------------------------------------
static int EnqOptions_SetOCIAttr(
udt_EnqOptions *self, // options object
PyObject *value, // value to set
ub4 *attribute) // OCI attribute type
{
dvoid *ociValue = NULL;
ub4 valueLength = 0;
udt_Buffer buffer;
ub4 ub4Value;
ub2 ub2Value;
sword status;
switch (*attribute) {
case OCI_ATTR_MSG_DELIVERY_MODE:
ub2Value = (ub2) PyInt_AsLong(value);
if (PyErr_Occurred())
return -1;
ociValue = &ub2Value;
break;
case OCI_ATTR_VISIBILITY:
ub4Value = (ub4) PyInt_AsLong(value);
if (PyErr_Occurred())
return -1;
ociValue = &ub4Value;
break;
case OCI_ATTR_TRANSFORMATION:
if (cxBuffer_FromObject(&buffer, value,
self->environment->encoding) < 0)
return -1;
ociValue = (dvoid*) buffer.ptr;
valueLength = buffer.size;
break;
};
status = OCIAttrSet(self->handle, OCI_DTYPE_AQENQ_OPTIONS,
ociValue, valueLength, *attribute, self->environment->errorHandle);
if (*attribute == gc_AQTransformation)
cxBuffer_Clear(&buffer);
if (Environment_CheckForError(self->environment, status,
"EnqOptions_SetOCIAttr()") < 0)
return -1;
return 0;
}
//-----------------------------------------------------------------------------
// DeqOptions_New()
// Create a new dequeue options object.
//-----------------------------------------------------------------------------
static udt_DeqOptions *DeqOptions_New(
udt_Environment *env) // environment in which to create
{
udt_DeqOptions *self;
sword status;
self = (udt_DeqOptions*) g_DeqOptionsType.tp_alloc(&g_DeqOptionsType, 0);
if (!self)
return NULL;
Py_INCREF(env);
self->environment = env;
status = OCIDescriptorAlloc(env->handle, (dvoid**) &self->handle,
OCI_DTYPE_AQDEQ_OPTIONS, 0, 0);
if (Environment_CheckForError(env, status, "DeqOptions_New()") < 0) {
Py_DECREF(self);
return NULL;
}
return self;
}
//-----------------------------------------------------------------------------
// DeqOptions_Free()
// Free the memory associated with the dequeue options object.
//-----------------------------------------------------------------------------
static void DeqOptions_Free(
udt_DeqOptions *self) // object to free
{
if (self->handle)
OCIDescriptorFree(self->handle, OCI_DTYPE_AQDEQ_OPTIONS);
Py_CLEAR(self->environment);
Py_TYPE(self)->tp_free((PyObject*) self);
}
//-----------------------------------------------------------------------------
// DeqOptions_GetOCIAttr()
// Get the value of the OCI attribute.
//-----------------------------------------------------------------------------
static PyObject *DeqOptions_GetOCIAttr(
udt_DeqOptions *self, // options object
ub4 *attribute) // OCI attribute type
{
ub4 valueLength, ub4Value;
dvoid *ociValue = NULL;
char *rawValuePtr;
OCIRaw *rawValue;
char *textValue;
sword status;
// get the value from the OCI
switch (*attribute) {
case OCI_ATTR_DEQ_MODE:
case OCI_ATTR_NAVIGATION:
case OCI_ATTR_VISIBILITY:
case OCI_ATTR_WAIT:
ociValue = &ub4Value;
break;
case OCI_ATTR_CONSUMER_NAME:
case OCI_ATTR_CORRELATION:
case OCI_ATTR_DEQCOND:
case OCI_ATTR_TRANSFORMATION:
ociValue = &textValue;
break;
case OCI_ATTR_DEQ_MSGID:
rawValue = NULL;
ociValue = &rawValue;
break;
};
status = OCIAttrGet(self->handle, OCI_DTYPE_AQDEQ_OPTIONS, ociValue,
&valueLength, *attribute, self->environment->errorHandle);
if (Environment_CheckForError(self->environment, status,
"DeqOptions_GetOCIAttr()") < 0)
return NULL;
if (ociValue == &textValue) {
if (!textValue)
Py_RETURN_NONE;
return cxString_FromEncodedString(textValue, valueLength,
self->environment->encoding);
} else if (ociValue == &rawValue) {
if (!rawValue)
Py_RETURN_NONE;
rawValuePtr = (char*) OCIRawPtr(self->environment->handle, rawValue);
valueLength = OCIRawSize(self->environment->handle, rawValue);
return PyBytes_FromStringAndSize(rawValuePtr, valueLength);
}
return PyInt_FromLong(ub4Value);
}
//-----------------------------------------------------------------------------
// DeqOptions_SetOCIAttr()
// Set the value of the OCI attribute.
//-----------------------------------------------------------------------------
static int DeqOptions_SetOCIAttr(
udt_DeqOptions *self, // options object
PyObject *value, // value to set
ub4 *attribute) // OCI attribute type
{
Py_ssize_t rawValueLength;
OCIRaw *rawValue = NULL;
dvoid *ociValue = NULL;
ub4 valueLength = 0;
udt_Buffer buffer;
char *rawValuePtr;
ub4 ub4Value;
ub2 ub2Value;
sword status;
cxBuffer_Init(&buffer);
switch (*attribute) {
case OCI_ATTR_MSG_DELIVERY_MODE:
ub2Value = (ub2) PyInt_AsLong(value);
if (PyErr_Occurred())
return -1;
ociValue = &ub2Value;
break;
case OCI_ATTR_DEQ_MODE:
case OCI_ATTR_NAVIGATION:
case OCI_ATTR_VISIBILITY:
case OCI_ATTR_WAIT:
ub4Value = (ub4) PyInt_AsLong(value);
if (PyErr_Occurred())
return -1;
ociValue = &ub4Value;
break;
case OCI_ATTR_CONSUMER_NAME:
case OCI_ATTR_CORRELATION:
case OCI_ATTR_DEQCOND:
case OCI_ATTR_TRANSFORMATION:
if (cxBuffer_FromObject(&buffer, value,
self->environment->encoding) < 0)
return -1;
ociValue = (dvoid*) buffer.ptr;
valueLength = buffer.size;
break;
case OCI_ATTR_DEQ_MSGID:
if (PyBytes_AsStringAndSize(value, &rawValuePtr,
&rawValueLength) < 0)
return -1;
status = OCIRawAssignBytes(self->environment->handle,
self->environment->errorHandle, (const ub1*) rawValuePtr,
(ub4) rawValueLength, &rawValue);
if (Environment_CheckForError(self->environment, status,
"DeqOptions_SetOCIAttr(): assign raw value") < 0)
return -1;
ociValue = (dvoid*) rawValue;
break;
};
status = OCIAttrSet(self->handle, OCI_DTYPE_AQDEQ_OPTIONS,
ociValue, valueLength, *attribute, self->environment->errorHandle);
cxBuffer_Clear(&buffer);
if (rawValue)
OCIRawResize(self->environment->handle, self->environment->errorHandle,
0, &rawValue);
if (Environment_CheckForError(self->environment, status,
"DeqOptions_SetOCIAttr(): set value") < 0)
return -1;
return 0;
}
//-----------------------------------------------------------------------------
// MessageProperties_New()
// Create a new dequeue options object.
//-----------------------------------------------------------------------------
static udt_MessageProperties *MessageProperties_New(
udt_Environment *env) // environment in which to create
{
udt_MessageProperties *self;
sword status;
self = (udt_MessageProperties*)
g_MessagePropertiesType.tp_alloc(&g_MessagePropertiesType, 0);
if (!self)
return NULL;
Py_INCREF(env);
self->environment = env;
status = OCIDescriptorAlloc(env->handle, (dvoid**) &self->handle,
OCI_DTYPE_AQMSG_PROPERTIES, 0, 0);
if (Environment_CheckForError(env, status,
"MessageProperties_New()") < 0) {
Py_DECREF(self);
return NULL;
}
return self;
}
//-----------------------------------------------------------------------------
// MessageProperties_Free()
// Free the memory associated with the message properties object.
//-----------------------------------------------------------------------------
static void MessageProperties_Free(
udt_MessageProperties *self) // object to free
{
if (self->handle)
OCIDescriptorFree(self->handle, OCI_DTYPE_AQMSG_PROPERTIES);
Py_CLEAR(self->environment);
Py_TYPE(self)->tp_free((PyObject*) self);
}
//-----------------------------------------------------------------------------
// MessageProperties_GetOCIAttr()
// Get the value of the OCI attribute.
//-----------------------------------------------------------------------------
static PyObject *MessageProperties_GetOCIAttr(
udt_MessageProperties *self, // options object
ub4 *attribute) // OCI attribute type
{
ub4 valueLength, ub4Value;
dvoid *ociValue = NULL;
char *rawValuePtr;
OCIDate dateValue;
OCIRaw *rawValue;
char *textValue;
sb4 sb4Value;
ub2 ub2Value;
sword status;
// get the value from the OCI
switch (*attribute) {
case OCI_ATTR_MSG_DELIVERY_MODE:
ociValue = &ub2Value;
break;
case OCI_ATTR_ATTEMPTS:
case OCI_ATTR_DELAY:
case OCI_ATTR_EXPIRATION:
case OCI_ATTR_PRIORITY:
ociValue = &sb4Value;
break;
case OCI_ATTR_MSG_STATE:
ociValue = &ub4Value;
break;
case OCI_ATTR_CORRELATION:
case OCI_ATTR_EXCEPTION_QUEUE:
ociValue = &textValue;
break;
case OCI_ATTR_ENQ_TIME:
ociValue = &dateValue;
break;
case OCI_ATTR_ORIGINAL_MSGID:
rawValue = NULL;
ociValue = &rawValue;
break;
};
status = OCIAttrGet(self->handle, OCI_DTYPE_AQMSG_PROPERTIES, ociValue,
&valueLength, *attribute, self->environment->errorHandle);
if (Environment_CheckForError(self->environment, status,
"MessageProperties_GetOCIAttr()") < 0)
return NULL;
if (ociValue == &textValue) {
if (!textValue)
Py_RETURN_NONE;
return cxString_FromEncodedString(textValue, valueLength,
self->environment->encoding);
} else if (ociValue == &rawValue) {
if (!rawValue)
Py_RETURN_NONE;
rawValuePtr = (char*) OCIRawPtr(self->environment->handle, rawValue);
valueLength = OCIRawSize(self->environment->handle, rawValue);
return PyBytes_FromStringAndSize(rawValuePtr, valueLength);
} else if (ociValue == &dateValue)
return OracleDateToPythonDate(&vt_DateTime, &dateValue);
else if (ociValue == &ub2Value)
return PyInt_FromLong(ub2Value);
else if (ociValue == &sb4Value)
return PyInt_FromLong(sb4Value);
return PyInt_FromLong(ub4Value);
}
//-----------------------------------------------------------------------------
// MessageProperties_SetOCIAttr()
// Set the value of the OCI attribute.
//-----------------------------------------------------------------------------
static int MessageProperties_SetOCIAttr(
udt_MessageProperties *self, // options object
PyObject *value, // value to set
ub4 *attribute) // OCI attribute type
{
Py_ssize_t rawValueLength;
OCIRaw *rawValue = NULL;
dvoid *ociValue = NULL;
ub4 valueLength = 0;
udt_Buffer buffer;
char *rawValuePtr;
ub4 ub4Value;
ub2 ub2Value;
sword status;
cxBuffer_Init(&buffer);
switch (*attribute) {
case OCI_ATTR_MSG_DELIVERY_MODE:
ub2Value = (ub2) PyInt_AsLong(value);
if (PyErr_Occurred())
return -1;
ociValue = &ub2Value;
break;
case OCI_ATTR_DEQ_MODE:
case OCI_ATTR_NAVIGATION:
case OCI_ATTR_VISIBILITY:
case OCI_ATTR_WAIT:
ub4Value = (ub4) PyInt_AsLong(value);
if (PyErr_Occurred())
return -1;
ociValue = &ub4Value;
break;
case OCI_ATTR_CONSUMER_NAME:
case OCI_ATTR_CORRELATION:
case OCI_ATTR_DEQCOND:
case OCI_ATTR_TRANSFORMATION:
if (cxBuffer_FromObject(&buffer, value,
self->environment->encoding) < 0)
return -1;
ociValue = (dvoid*) buffer.ptr;
valueLength = buffer.size;
break;
case OCI_ATTR_DEQ_MSGID:
if (PyBytes_AsStringAndSize(value, &rawValuePtr,
&rawValueLength) < 0)
return -1;
status = OCIRawAssignBytes(self->environment->handle,
self->environment->errorHandle, (const ub1*) rawValuePtr,
(ub4) rawValueLength, &rawValue);
if (Environment_CheckForError(self->environment, status,
"MessageProperties_SetOCIAttr(): assign raw value") < 0)
return -1;
ociValue = (dvoid*) rawValue;
break;
};
status = OCIAttrSet(self->handle, OCI_DTYPE_AQMSG_PROPERTIES,
ociValue, valueLength, *attribute, self->environment->errorHandle);
cxBuffer_Clear(&buffer);
if (rawValue)
OCIRawResize(self->environment->handle, self->environment->errorHandle,
0, &rawValue);
if (Environment_CheckForError(self->environment, status,
"MessageProperties_SetOCIAttr(): set value") < 0)
return -1;
return 0;
}

View File

@ -66,6 +66,11 @@ static PyObject *Connection_ContextManagerExit(udt_Connection*, PyObject*);
static PyObject *Connection_ChangePasswordExternal(udt_Connection*, PyObject*);
static PyObject *Connection_GetType(udt_Connection*, PyObject*);
static PyObject *Connection_GetStmtCacheSize(udt_Connection*, void*);
static PyObject *Connection_NewEnqueueOptions(udt_Connection*, PyObject*);
static PyObject *Connection_NewDequeueOptions(udt_Connection*, PyObject*);
static PyObject *Connection_NewMessageProperties(udt_Connection*, PyObject*);
static PyObject *Connection_Dequeue(udt_Connection*, PyObject*, PyObject*);
static PyObject *Connection_Enqueue(udt_Connection*, PyObject*, PyObject*);
static int Connection_SetStmtCacheSize(udt_Connection*, PyObject*, void*);
#if ORACLE_VERSION_HEX >= ORACLE_VERSION(10, 2)
static PyObject *Connection_GetOCIAttr(udt_Connection*, ub4*);
@ -114,6 +119,12 @@ static PyMethodDef g_ConnectionMethods[] = {
{ "changepassword", (PyCFunction) Connection_ChangePasswordExternal,
METH_VARARGS },
{ "gettype", (PyCFunction) Connection_GetType, METH_VARARGS },
{ "deqoptions", (PyCFunction) Connection_NewDequeueOptions, METH_NOARGS },
{ "enqoptions", (PyCFunction) Connection_NewEnqueueOptions, METH_NOARGS },
{ "msgproperties", (PyCFunction) Connection_NewMessageProperties,
METH_NOARGS },
{ "deq", (PyCFunction) Connection_Dequeue, METH_VARARGS | METH_KEYWORDS },
{ "enq", (PyCFunction) Connection_Enqueue, METH_VARARGS | METH_KEYWORDS },
{ NULL }
};
@ -798,6 +809,7 @@ static int Connection_Connect(
#if ORACLE_VERSION_HEX >= ORACLE_VERSION(10, 2)
#include "Subscription.c"
#endif
#include "AQ.c"
//-----------------------------------------------------------------------------
@ -1557,6 +1569,155 @@ static PyObject *Connection_Cancel(
}
//-----------------------------------------------------------------------------
// Connection_NewEnqueueOptions()
// Creates a new enqueue options object and returns it.
//-----------------------------------------------------------------------------
static PyObject *Connection_NewEnqueueOptions(
udt_Connection *self, // connection
PyObject *args) // none
{
return (PyObject*) EnqOptions_New(self->environment);
}
//-----------------------------------------------------------------------------
// Connection_NewDequeueOptions()
// Creates a new dequeue options object and returns it.
//-----------------------------------------------------------------------------
static PyObject *Connection_NewDequeueOptions(
udt_Connection *self, // connection
PyObject *args) // none
{
return (PyObject*) DeqOptions_New(self->environment);
}
//-----------------------------------------------------------------------------
// Connection_NewMessageProperties()
// Creates a new message properties object and returns it.
//-----------------------------------------------------------------------------
static PyObject *Connection_NewMessageProperties(
udt_Connection *self, // connection
PyObject *args) // none
{
return (PyObject*) MessageProperties_New(self->environment);
}
//-----------------------------------------------------------------------------
// Connection_Dequeue()
// Dequeues a message using Advanced Queuing capabilities. The message ID is
// returned if a message is available or None if no message is available.
//-----------------------------------------------------------------------------
static PyObject *Connection_Dequeue(
udt_Connection *self, // connection
PyObject* args, // arguments
PyObject* keywordArgs) // keyword arguments
{
static char *keywordList[] = { "name", "options", "msgproperties",
"payload", NULL };
PyObject *nameObj, *excType, *excValue, *traceback;
udt_MessageProperties *propertiesObj;
udt_DeqOptions *optionsObj;
udt_Object *payloadObj;
udt_Buffer nameBuffer;
char *messageIdValue;
OCIRaw *messageId;
ub4 messageIdSize;
udt_Error *error;
sword status;
// parse arguments
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "OO!O!O!", keywordList,
&nameObj, &g_DeqOptionsType, &optionsObj, &g_MessagePropertiesType,
&propertiesObj, &g_ObjectType, &payloadObj))
return NULL;
if (cxBuffer_FromObject(&nameBuffer, nameObj,
self->environment->encoding) < 0)
return NULL;
// enqueue payload
messageId = NULL;
status = OCIAQDeq(self->handle, self->environment->errorHandle,
(oratext*) nameBuffer.ptr, optionsObj->handle,
propertiesObj->handle, payloadObj->objectType->tdo,
&payloadObj->instance, &payloadObj->indicator, &messageId,
OCI_DEFAULT);
cxBuffer_Clear(&nameBuffer);
if (Environment_CheckForError(self->environment, status,
"Connection_Dequeue()") < 0) {
PyErr_Fetch(&excType, &excValue, &traceback);
if (excValue) {
error = (udt_Error*) excValue;
if (error->code == 25228) {
Py_XDECREF(excType);
Py_XDECREF(excValue);
Py_XDECREF(traceback);
Py_RETURN_NONE;
}
}
PyErr_Restore(excType, excValue, traceback);
return NULL;
}
// determine the message id
messageIdValue = (char*) OCIRawPtr(self->environment->handle, messageId);
messageIdSize = OCIRawSize(self->environment->handle, messageId);
return PyBytes_FromStringAndSize(messageIdValue, messageIdSize);
}
//-----------------------------------------------------------------------------
// Connection_Enqueue()
// Enqueues a message using Advanced Queuing capabilities. The message ID is
// returned.
//-----------------------------------------------------------------------------
static PyObject *Connection_Enqueue(
udt_Connection *self, // connection
PyObject* args, // arguments
PyObject* keywordArgs) // keyword arguments
{
static char *keywordList[] = { "name", "options", "msgproperties",
"payload", NULL };
udt_MessageProperties *propertiesObj;
udt_EnqOptions *optionsObj;
udt_Object *payloadObj;
udt_Buffer nameBuffer;
char *messageIdValue;
PyObject *nameObj;
OCIRaw *messageId;
ub4 messageIdSize;
sword status;
// parse arguments
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "OO!O!O!", keywordList,
&nameObj, &g_EnqOptionsType, &optionsObj, &g_MessagePropertiesType,
&propertiesObj, &g_ObjectType, &payloadObj))
return NULL;
if (cxBuffer_FromObject(&nameBuffer, nameObj,
self->environment->encoding) < 0)
return NULL;
// enqueue payload
messageId = NULL;
status = OCIAQEnq(self->handle, self->environment->errorHandle,
(oratext*) nameBuffer.ptr, optionsObj->handle,
propertiesObj->handle, payloadObj->objectType->tdo,
&payloadObj->instance, &payloadObj->indicator, &messageId,
OCI_DEFAULT);
cxBuffer_Clear(&nameBuffer);
if (Environment_CheckForError(self->environment, status,
"Connection_Enqueue()") < 0)
return NULL;
// determine the message id
messageIdValue = (char*) OCIRawPtr(self->environment->handle, messageId);
messageIdSize = OCIRawSize(self->environment->handle, messageId);
return PyBytes_FromStringAndSize(messageIdValue, messageIdSize);
}
//-----------------------------------------------------------------------------
// Connection_RegisterCallback()
// Register a callback for the OCI function.

View File

@ -358,6 +358,9 @@ static PyObject *Module_Initialize(void)
MAKE_TYPE_READY(&g_ObjectAttributeType);
MAKE_TYPE_READY(&g_ExternalLobVarType);
MAKE_TYPE_READY(&g_ObjectType);
MAKE_TYPE_READY(&g_EnqOptionsType);
MAKE_TYPE_READY(&g_DeqOptionsType);
MAKE_TYPE_READY(&g_MessagePropertiesType);
#if ORACLE_VERSION_HEX >= ORACLE_VERSION(10, 2)
MAKE_TYPE_READY(&g_SubscriptionType);
MAKE_TYPE_READY(&g_MessageType);
@ -442,6 +445,10 @@ static PyObject *Module_Initialize(void)
ADD_TYPE_OBJECT("SessionPool", &g_SessionPoolType)
ADD_TYPE_OBJECT("_Error", &g_ErrorType)
ADD_TYPE_OBJECT("Object", &g_ObjectType)
ADD_TYPE_OBJECT("ObjectType", &g_ObjectTypeType)
ADD_TYPE_OBJECT("EnqOptions", &g_EnqOptionsType)
ADD_TYPE_OBJECT("DeqOptions", &g_DeqOptionsType)
ADD_TYPE_OBJECT("MessageProperties", &g_MessagePropertiesType)
// the name "connect" is required by the DB API
ADD_TYPE_OBJECT("connect", &g_ConnectionType)
@ -556,6 +563,30 @@ static PyObject *Module_Initialize(void)
ADD_OCI_CONSTANT(SUBSCR_QOS_HAREG)
#endif
// add constants for advanced queueing
ADD_OCI_CONSTANT(DEQ_BROWSE)
ADD_OCI_CONSTANT(DEQ_FIRST_MSG)
ADD_OCI_CONSTANT(DEQ_IMMEDIATE)
ADD_OCI_CONSTANT(DEQ_LOCKED)
ADD_OCI_CONSTANT(DEQ_NEXT_MSG)
ADD_OCI_CONSTANT(DEQ_NEXT_TRANSACTION)
ADD_OCI_CONSTANT(DEQ_NO_WAIT)
ADD_OCI_CONSTANT(DEQ_ON_COMMIT)
ADD_OCI_CONSTANT(DEQ_REMOVE)
ADD_OCI_CONSTANT(DEQ_REMOVE_NODATA)
ADD_OCI_CONSTANT(DEQ_WAIT_FOREVER)
ADD_OCI_CONSTANT(ENQ_IMMEDIATE)
ADD_OCI_CONSTANT(ENQ_ON_COMMIT)
ADD_OCI_CONSTANT(MSG_NO_DELAY)
ADD_OCI_CONSTANT(MSG_NO_EXPIRATION)
ADD_OCI_CONSTANT(MSG_PERSISTENT)
ADD_OCI_CONSTANT(MSG_BUFFERED)
ADD_OCI_CONSTANT(MSG_EXPIRED)
ADD_OCI_CONSTANT(MSG_PERSISTENT_OR_BUFFERED)
ADD_OCI_CONSTANT(MSG_READY)
ADD_OCI_CONSTANT(MSG_PROCESSED)
ADD_OCI_CONSTANT(MSG_WAITING)
return module;
}

218
doc/aq.rst Normal file
View File

@ -0,0 +1,218 @@
.. _aq:
****************
Advanced Queuing
****************
.. _deqoptions:
---------------
Dequeue Options
---------------
.. note::
This object is an extension to the DB API. It is returned by the
:meth:`~Connection.deqoptions()` call and is used in calls to
:meth:`~Connection.deq()`.
.. attribute:: DeqOptions.condition
This attribute specifies a boolean expression similar to the where clause
of a SQL query. The boolean expression can include conditions on message
properties, user data properties and PL/SQL or SQL functions.
.. attribute:: DeqOptions.consumername
This attribute specifies the name of the consumer. Only messages matching
the consumer name will be accessed. If the queue is not set up for multiple
consumers this attribute should not be set.
.. attribute:: DeqOptions.correlation
This attribute specifies the correlation identifier of the message to be
dequeued. Special pattern-matching characters, such as the percent sign (%)
and the underscore (_), can be used. If multiple messages satisfy the
pattern, the order of dequeuing is indeterminate.
.. attribute:: DeqOptions.deliverymode
This write-only attribute specifies whether persistent
(:data:`cx_Oracle.MSG_PERSISTENT`), buffered
(:data:`cx_Oracle.MSG_BUFFERED`) or both
(:data:`cx_Oracle.MSG_PERSISTENT_OR_BUFFERED`) types of messages should
be dequeued.
.. attribute:: DeqOptions.mode
This attribute specifies the locking behaviour associated with the dequeue
operation. The valid values are :data:`cx_Oracle.DEQ_BROWSE` (the message is
read without acquiring any lock on the message),
:data:`cx_Oracle.DEQ_LOCKED` (the message is read and a write lock is
obtained), :data:`cx_Oracle.DEQ_REMOVE` (the message is read and updated
or deleted, which is also the default value) or
:data:`cx_Oracle.DEQ_REMOVE_NODATA` (which confirms receipt of the message
but does not deliver the actual message content).
.. attribute:: DeqOptions.msgid
This attribute specifies the identifier of the message to be dequeued.
.. attribute:: DeqOptions.navigation
This attribute specifies the position of the message that is retrieved.
The valid values are :data:`cx_Oracle.DEQ_FIRST_MSG` (retrieves the first
available message that matches the search criteria),
:data:`cx_Oracle.DEQ_NEXT_MSG` (retrieves the next available message that
matches the search criteria, which is also the default value), or
:data:`cx_Oracle.DEQ_NEXT_TRANSACTION` (skips the remainder of the current
transaction group and retrieves the first message of the next transaction
group).
.. attribute:: DeqOptions.transformation
This attribute specifies the name of the transformation that must be applied
after the message is enqueued from the database but before it is returned to
the calling application. The transformation must be created using
dbms_transform.
.. attribute:: DeqOptions.visibility
This attribute specifies the transactional behavior of the enqueue request.
It must be one of :data:`cx_Oracle.ENQ_ON_COMMIT` indicating that the
enqueue is part of the current transaction (the default) or
:data:`cx_Oracle.ENQ_IMMEDIATE` indicating that the enqueue operation
constitutes a transaction of its own. This parameter is ignored when using
the DEQ_BROWSE mode.
.. _enqoptions:
---------------
Enqueue Options
---------------
.. note::
This object is an extension to the DB API. It is returned by the
:meth:`~Connection.enqoptions()` call and is used in calls to
:meth:`~Connection.enq()`.
.. attribute:: EnqOptions.deliverymode
This write-only attribute specifies whether persistent
(:data:`cx_Oracle.MSG_PERSISTENT`) or buffered
(:data:`cx_Oracle.MSG_BUFFERED`) messages should be enqueued.
.. attribute:: EnqOptions.transformation
This attribute specifies the name of the transformation that must be applied
before the message is enqueued into the database. The transformation must
be created using dbms_transform.
.. attribute:: EnqOptions.visibility
This attribute specifies the transactional behavior of the enqueue request.
It must be one of :data:`cx_Oracle.ENQ_ON_COMMIT` indicating that the
enqueue is part of the current transaction (the default) or
:data:`cx_Oracle.ENQ_IMMEDIATE` indicating that the enqueue operation
constitutes a transaction of its own.
.. _msgproperties:
------------------
Message Properties
------------------
.. note::
This object is an extension to the DB API. It is returned by the
:meth:`~Connection.msgproperties()` call and is used in calls to
:meth:`~Connection.deq` and :meth:`~Connection.enq()`.
.. attribute:: MessageProperties.attempts
This read-only attribute specifies the number of attempts that have been
made to dequeue the message.
.. attribute:: MessageProperties.correlation
This attribute specifies the correlation used when the message was enqueued.
.. attribute:: MessageProperties.delay
This attribute specifies the number of seconds to delay an enqueued message.
Any integer is acceptable but the constant :data:`cx_Oracle.MSG_NO_DELAY`
can also be used indicating that the message is available for immediate
dequeuing.
.. attribute:: EnqOptions.deliverymode
This read-only attribute specifies whether a persistent
(:data:`cx_Oracle.MSG_PERSISTENT`) or buffered
(:data:`cx_Oracle.MSG_BUFFERED`) message was dequeued.
.. attribute:: EnqOptions.enqtime
This read-only attribute specifies the time that the message was enqueued.
.. attribute:: EnqOptions.exceptionq
This attribute specifies the name of the queue to which the message is
moved if it cannot be processed successfully. Messages are moved if the
number of unsuccessful dequeue attempts has exceeded the maximum number of
retries or if the message has expired. All messages in the exception queue
are in the EXPIRED state. The default value is the name of the exception
queue associated with the queue table.
.. attribute:: EnqOptions.expiration
This attribute specifies, in seconds, how long the message is available for
dequeuing. This attribute is an offset from the delay attribute. Expiration
processing requires the queue monitor to be running. Any integer is accepted
but the constant :data:`cx_Oracle.MSG_NO_EXPIRATION` can also be used
indicating that the message never expires.
.. attribute:: EnqOptions.msgid
This attribute specifies the id of the message in the last queue that
generated this message.
.. attribute:: EnqOptions.priority
This attribute specifies the priority of the message. A smaller number
indicates a higher priority. The priority can be any integer, including
negative numbers. The default value is zero.
.. attribute:: EnqOptions.state
This read-only attribute specifies the state of the message at the time of
the dequeue. It will be one of :data:`cx_Oracle.MSG_WAITING` (the message
delay has not yet been reached), :data:`cx_Oracle.MSG_READY` (the message
is ready to be processed), :data:`cx_Oracle.MSG_PROCESSED` (the message
has been processed and is retained) or :data:`cx_Oracle.MSG_EXPIRED`
(the message has been moved to the exception queue).

View File

@ -135,6 +135,33 @@ Connection Object
Return a new Cursor object (:ref:`cursorobj`) using the connection.
.. method:: Connection.deq(name, options, msgproperties, payload)
Returns a message id after successfully dequeuing a message. The options
object can be created using :meth:`~Connection.deqoptions()` and the
msgproperties object can be created using
:meth:`~Connection.msgproperties()`. The payload must be an object created
using :meth:`~ObjectType.newobject()`.
.. versionadded:: development
.. note::
This method is an extension to the DB API definition.
.. method:: Connection.deqoptions()
Returns an object specifying the options to use when dequeuing messages.
See :ref:`deqoptions` for more information.
.. versionadded:: development
.. note::
This method is an extension to the DB API definition.
.. attribute:: Connection.dsn
This read-only attribute returns the TNS entry of the database to which a
@ -169,6 +196,33 @@ Connection Object
available in Python 2.x when not built in unicode mode.
.. method:: Connection.enq(name, options, msgproperties, payload)
Returns a message id after successfully enqueuing a message. The options
object can be created using :meth:`~Connection.enqoptions()` and the
msgproperties object can be created using
:meth:`~Connection.msgproperties()`. The payload must be an object created
using :meth:`~ObjectType.newobject()`.
.. versionadded:: development
.. note::
This method is an extension to the DB API definition.
.. method:: Connection.enqoptions()
Returns an object specifying the options to use when enqueuing messages.
See :ref:`enqoptions` for more information.
.. versionadded:: development
.. note::
This method is an extension to the DB API definition.
.. method:: Connection.gettype(name)
Return a type object (:ref:`objecttype`) given its name. This can then be
@ -233,6 +287,18 @@ Connection Object
This attribute is an extension to the DB API definition.
.. method:: Connection.msgproperties()
Returns an object specifying the properties of messages used in advanced
queuing. See :ref:`msgproperties` for more information.
.. versionadded:: development
.. note::
This method is an extension to the DB API definition.
.. attribute:: Connection.nencoding
This read-only attribute returns the IANA character set name of the national

View File

@ -22,6 +22,7 @@ Contents:
subscription.rst
lob.rst
objecttype.rst
aq.rst
releasenotes.rst
license.rst

View File

@ -249,6 +249,167 @@ Global
This attribute is an extension to the DB API definition.
Advanced Queuing: Common
------------------------
.. note::
These constants are extensions to the DB API definition.
.. data:: MSG_BUFFERED
This constant is used to specify that enqueue/dequeue operations should
enqueue or dequeue buffered messages.
.. data:: MSG_EXPIRED
This constant is used to specify that the message has been moved to the
exception queue.
.. data:: MSG_NO_DELAY
This constant is used to specify that the message is available for
immediate dequeuing.
.. data:: MSG_NO_EXPIRATION
This constant is used to specify that the message never expires.
.. data:: MSG_PERSISTENT
This constant is used to specify that enqueue/dequeue operations should
enqueue or dequeue persistent messages (the default).
.. data:: MSG_PROCESSED
This constant is used to specify that the message has been processed and
is retained.
.. data:: MSG_READY
This constant is used to specify that the message is ready to be processed.
.. data:: MSG_WAITING
This constant is used to specify that the message delay has not yet been
reached.
Advanced Queuing: Dequeue
-------------------------
.. note::
These constants are extensions to the DB API definition.
.. data:: DEQ_BROWSE
This constant is used to specify that dequeue should read the message
without acquiring any lock on the message (eqivalent to a select statement).
.. data:: DEQ_FIRST_MSG
This constant is used to specify that dequeue should retrieve the first
available message that matches the search criteria. This resets the position
to the beginning of the queue.
.. data:: DEQ_IMMEDIATE
This constant is used to specify that dequeue should perform its work as
part of an independent transaction.
.. data:: DEQ_LOCKED
This constant is used to specify that dequeue should read and obtain a
write lock on the message for the duration of the transaction (equivalent to
a select for update statement).
.. data:: DEQ_NEXT_MSG
This constant is used to specify that dequeue should retrieve the next
available message that matches the search criteria. If the previous message
belongs to a message group, AQ retrieves the next available message that
matches the search criteria and belongs to the message group. This is the
default.
.. data:: DEQ_NEXT_TRANSACTION
This constant is used to specify that dequeue should skip the remainder of
the transaction group and retrieve the first message of the next transaction
group. This option can only be used if message grouping is enabled for the
current queue.
.. data:: DEQ_NO_WAIT
This constant is used to specify that dequeue not wait for messages to be
available for dequeuing.
.. data:: DEQ_ON_COMMIT
This constant is used to specify that dequeue should be part of the current
transaction (the default).
.. data:: DEQ_REMOVE
This constant is used to specify that dequeue should read the message and
update or delete it (the default mode).
.. data:: DEQ_REMOVE_NODATA
This constant is used to specify that dequeue should confirm receipt of the
message but not deliver the actual message content.
.. data:: DEQ_WAIT_FOREVER
This constant is used to specify that dequeue should wait forever for
messages to be available for dequeuing (the default wait mode).
.. data:: MSG_PERSISTENT_OR_BUFFERED
This constant is used to specify that dequeue should dequeue either
persistent or buffered messages.
Advanced Queuing: Enqueue
-------------------------
.. note::
These constants are extensions to the DB API definition.
.. data:: ENQ_IMMEDIATE
This constant is used to specify that enqueue should perform its work as
part of an independent transaction.
.. data:: ENQ_ON_COMMIT
This constant is used to specify that enqueue should be part of the current
transaction (the default).
Database Callbacks
------------------

View File

@ -64,54 +64,17 @@ book2 = booksType.newobject()
book2.TITLE = "Harry Potter and the Philosopher's Stone"
book2.AUTHORS = "Rowling, J.K."
book2.PRICE = decimal.Decimal("7.99")
options = connection.enqoptions()
messageProperties = connection.msgproperties()
for book in (book1, book2):
print("Enqueuing book", book.TITLE)
cursor.execute("""
declare
t_MessageId raw(16);
t_Options dbms_aq.enqueue_options_t;
t_MessageProperties dbms_aq.message_properties_t;
begin
dbms_aq.enqueue(
queue_name => :queueName,
enqueue_options => t_Options,
message_properties => t_MessageProperties,
payload => :book,
msgid => t_MessageId);
end;""",
queueName = QUEUE_NAME,
book = book)
connection.enq(QUEUE_NAME, options, messageProperties, book)
connection.commit()
# dequeue the messages
var = cursor.var(cx_Oracle.OBJECT, typename = BOOK_TYPE_NAME)
while True:
cursor.execute("""
declare
t_MessageId raw(16);
t_Options dbms_aq.dequeue_options_t;
t_MessageProperties dbms_aq.message_properties_t;
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
t_Options.navigation := dbms_aq.FIRST_MESSAGE;
t_Options.wait := dbms_aq.NO_WAIT;
begin
dbms_aq.dequeue(
queue_name => :queueName,
dequeue_options => t_Options,
message_properties => t_MessageProperties,
payload => :book,
msgid => t_MessageId);
exception
when no_messages then
:book := null;
end;
end;""",
queueName = QUEUE_NAME,
book = var)
book = var.getvalue()
if book is None:
break
options = connection.deqoptions()
options.navigation = cx_Oracle.DEQ_FIRST_MSG
options.wait = cx_Oracle.DEQ_NO_WAIT
while connection.deq(QUEUE_NAME, options, messageProperties, book):
print("Dequeued book", book.TITLE)

View File

@ -362,7 +362,7 @@ extension = Extension(
extra_compile_args = extraCompileArgs,
extra_link_args = extraLinkArgs,
sources = ["cx_Oracle.c"],
depends = ["Buffer.c", "Callback.c", "Connection.c", "Cursor.c",
depends = ["AQ.c", "Buffer.c", "Callback.c", "Connection.c", "Cursor.c",
"CursorVar.c", "DateTimeVar.c", "Environment.c", "Error.c",
"ExternalLobVar.c", "IntervalVar.c", "LobVar.c", "LongVar.c",
"NumberVar.c", "Object.c", "ObjectType.c", "ObjectVar.c",