Added support for passing the message to the handler and populating it with the

initial information that will be useful; add the OCI constants to the module
in order to enable different values for subscriptions as well as decode the
message data.
This commit is contained in:
Anthony Tuininga 2009-01-07 22:55:14 +00:00
parent 78d619cfb6
commit 7fc7ede45d
2 changed files with 180 additions and 18 deletions

View File

@ -18,6 +18,13 @@ typedef struct {
ub4 rowids;
} udt_Subscription;
typedef struct {
PyObject_HEAD
ub4 type;
PyObject *dbname;
PyObject *tables;
} udt_Message;
//-----------------------------------------------------------------------------
// Declaration of subscription functions
@ -25,9 +32,34 @@ typedef struct {
static void Subscription_Free(udt_Subscription*);
static PyObject *Subscription_Repr(udt_Subscription*);
static PyObject *Subscription_RegisterQuery(udt_Subscription*, PyObject*);
static void Message_Free(udt_Message*);
//-----------------------------------------------------------------------------
// declaration of methods for Python type "Subscription"
// declaration of members for Python types
//-----------------------------------------------------------------------------
static PyMemberDef g_SubscriptionTypeMembers[] = {
{ "callback", T_OBJECT, offsetof(udt_Subscription, callback), READONLY },
{ "connection", T_OBJECT, offsetof(udt_Subscription, connection),
READONLY },
{ "namespace", T_INT, offsetof(udt_Subscription, namespace), READONLY },
{ "protocol", T_INT, offsetof(udt_Subscription, protocol), READONLY },
{ "timeout", T_INT, offsetof(udt_Subscription, timeout), READONLY },
{ "operations", T_INT, offsetof(udt_Subscription, operations), READONLY },
{ "rowids", T_BOOL, offsetof(udt_Subscription, rowids), READONLY },
{ NULL }
};
static PyMemberDef g_MessageTypeMembers[] = {
{ "type", T_INT, offsetof(udt_Message, type), READONLY },
{ "dbname", T_OBJECT, offsetof(udt_Message, dbname), READONLY },
{ "tables", T_OBJECT, offsetof(udt_Message, tables), READONLY },
{ NULL }
};
//-----------------------------------------------------------------------------
// declaration of methods for Python types
//-----------------------------------------------------------------------------
static PyMethodDef g_SubscriptionTypeMethods[] = {
{ "registerquery", (PyCFunction) Subscription_RegisterQuery,
@ -68,7 +100,52 @@ static PyTypeObject g_SubscriptionType = {
0, // tp_iter
0, // tp_iternext
g_SubscriptionTypeMethods, // tp_methods
0, // tp_members
g_SubscriptionTypeMembers, // tp_members
0, // tp_getset
0, // tp_base
0, // tp_dict
0, // tp_descr_get
0, // tp_descr_set
0, // tp_dictoffset
0, // tp_init
0, // tp_alloc
0, // tp_new
0, // tp_free
0, // tp_is_gc
0 // tp_bases
};
static PyTypeObject g_MessageType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cx_Oracle.Message", // tp_name
sizeof(udt_Message), // tp_basicsize
0, // tp_itemsize
(destructor) Message_Free, // tp_dealloc
0, // tp_print
0, // tp_getattr
0, // tp_setattr
0, // tp_compare
0, // tp_repr
0, // tp_as_number
0, // tp_as_sequence
0, // tp_as_mapping
0, // tp_hash
0, // tp_call
0, // tp_str
0, // tp_getattro
0, // tp_setattro
0, // tp_as_buffer
Py_TPFLAGS_DEFAULT, // tp_flags
0, // tp_doc
0, // tp_traverse
0, // tp_clear
0, // tp_richcompare
0, // tp_weaklistoffset
0, // tp_iter
0, // tp_iternext
0, // tp_methods
g_MessageTypeMembers, // tp_members
0, // tp_getset
0, // tp_base
0, // tp_dict
@ -84,31 +161,74 @@ static PyTypeObject g_SubscriptionType = {
};
//-----------------------------------------------------------------------------
// Message_Initialize()
// Initialize a new message with the information from the descriptor.
//-----------------------------------------------------------------------------
static int Message_Initialize(
udt_Message *self, // message object to initialize
udt_Environment *env, // environment to use
dvoid *descriptor) // descriptor to get information from
{
ub4 dbnameLength;
text *dbname;
sword status;
// determine type
status = OCIAttrGet(descriptor, OCI_DTYPE_CHDES, &self->type, NULL,
OCI_ATTR_CHDES_NFYTYPE, env->errorHandle);
if (Environment_CheckForError(env, status,
"Subscription_CallbackHandler(): get type") < 0)
return -1;
// determine database name
status = OCIAttrGet(descriptor, OCI_DTYPE_CHDES, &dbname, &dbnameLength,
OCI_ATTR_CHDES_DBNAME, env->errorHandle);
if (Environment_CheckForError(env, status,
"Subscription_CallbackHandler(): get database name") < 0)
return -1;
self->dbname = cxString_FromEncodedString(dbname, dbnameLength);
if (!self->dbname)
return -1;
return 0;
}
//-----------------------------------------------------------------------------
// Subscription_CallbackHandler()
// Routine that performs the actual call.
//-----------------------------------------------------------------------------
static int Subscription_CallbackHandler(
udt_Subscription *self) // subscription object
udt_Subscription *self, // subscription object
udt_Environment *env, // environment to use
dvoid *descriptor) // descriptor to get information from
{
PyObject *result, *args;
udt_Message *message;
// create the message
message = (udt_Message*) g_MessageType.tp_alloc(&g_MessageType, 0);
if (!message)
return -1;
if (Message_Initialize(message, env, descriptor) < 0) {
Py_DECREF(message);
return -1;
}
// create the arguments for the call
printf("creating arguments for call...\n");
args = PyTuple_New(0);
args = PyTuple_Pack(1, message);
Py_DECREF(message);
if (!args)
return -1;
printf("making actual call...\n");
// make the actual call
result = PyObject_Call(self->callback, args, NULL);
printf("Result is %x\n",result);
Py_DECREF(args);
if (!result)
return -1;
Py_DECREF(result);
printf("all looks good here\n");
return 0;
}
@ -122,15 +242,14 @@ static void Subscription_Callback(
OCISubscription *handle, // subscription handle
dvoid *payload, // payload
ub4 *payloadLength, // payload length
dvoid *changeDescriptor, // change descriptor
dvoid *descriptor, // descriptor
ub4 mode) // mode used
{
#ifdef WITH_THREAD
PyThreadState *threadState;
#endif
udt_Environment *env;
printf("Received notification...\n");
printf("self is %x\n", self);
// create new thread state, if necessary
#ifdef WITH_THREAD
threadState = PyThreadState_Swap(NULL);
@ -138,7 +257,6 @@ printf("self is %x\n", self);
PyThreadState_Swap(threadState);
threadState = NULL;
} else {
printf("creating new thread state....\n");
threadState = PyThreadState_New(g_InterpreterState);
if (!threadState) {
PyErr_Print();
@ -149,14 +267,18 @@ printf("creating new thread state....\n");
#endif
// perform the call
printf("performing the call...\n");
if (Subscription_CallbackHandler(self) < 0)
env = Environment_NewFromScratch(0, 0);
if (!env)
PyErr_Print();
else {
if (Subscription_CallbackHandler(self, env, descriptor) < 0)
PyErr_Print();
Py_DECREF(env);
}
// restore thread state, if necessary
#ifdef WITH_THREAD
if (threadState) {
printf("restoring thread state...\n");
PyThreadState_Clear(threadState);
PyEval_ReleaseThread(threadState);
PyThreadState_Delete(threadState);
@ -175,7 +297,6 @@ static int Subscription_Register(
udt_Environment *env;
sword status;
printf("creating registration with pointer %x\n", self);
// create the subscription handle
env = self->connection->environment;
status = OCIHandleAlloc(env->handle, (dvoid**) &self->handle,
@ -294,7 +415,7 @@ static udt_Subscription *Subscription_New(
//-----------------------------------------------------------------------------
// Subscription_Free()
// Free the memory associated with an object type.
// Free the memory associated with a subscription.
//-----------------------------------------------------------------------------
static void Subscription_Free(
udt_Subscription *self) // subscription to free
@ -438,7 +559,7 @@ static PyObject *Subscription_RegisterQuery(
}
// execute the query which registers it
if (Cursor_InternalExecute(cursor, 1) < 0) {
if (Cursor_InternalExecute(cursor, 0) < 0) {
Py_DECREF(cursor);
return NULL;
}
@ -448,3 +569,16 @@ static PyObject *Subscription_RegisterQuery(
return Py_None;
}
//-----------------------------------------------------------------------------
// Message_Free()
// Free the memory associated with a mesasge.
//-----------------------------------------------------------------------------
static void Message_Free(
udt_Message *self) // message to free
{
Py_CLEAR(self->dbname);
Py_CLEAR(self->tables);
Py_TYPE(self)->tp_free((PyObject*) self);
}

View File

@ -34,6 +34,11 @@ typedef int Py_ssize_t;
#define PY_SSIZE_T_MIN INT_MIN
#endif
// define T_BOOL for versions before Python 2.5
#ifndef T_BOOL
#define T_BOOL T_INT
#endif
// define Py_TYPE for versions before Python 2.6
#ifndef Py_TYPE
#define Py_TYPE(ob) (((PyObject*)(ob))->ob_type)
@ -344,6 +349,7 @@ static PyObject *Module_Initialize(void)
MAKE_TYPE_READY(&g_ExternalObjectVarType);
#ifdef ORACLE_10GR2
MAKE_TYPE_READY(&g_SubscriptionType);
MAKE_TYPE_READY(&g_MessageType);
#endif
MAKE_VARIABLE_TYPE_READY(&g_StringVarType);
MAKE_VARIABLE_TYPE_READY(&g_FixedCharVarType);
@ -486,6 +492,28 @@ static PyObject *Module_Initialize(void)
ADD_OCI_CONSTANT(DBSHUTDOWN_IMMEDIATE)
ADD_OCI_CONSTANT(DBSHUTDOWN_TRANSACTIONAL)
ADD_OCI_CONSTANT(DBSHUTDOWN_TRANSACTIONAL_LOCAL)
ADD_OCI_CONSTANT(EVENT_NONE)
ADD_OCI_CONSTANT(EVENT_STARTUP)
ADD_OCI_CONSTANT(EVENT_SHUTDOWN)
ADD_OCI_CONSTANT(EVENT_SHUTDOWN_ANY)
ADD_OCI_CONSTANT(EVENT_DROP_DB)
ADD_OCI_CONSTANT(EVENT_DEREG)
ADD_OCI_CONSTANT(EVENT_OBJCHANGE)
ADD_OCI_CONSTANT(OPCODE_ALLOPS)
ADD_OCI_CONSTANT(OPCODE_ALLROWS)
ADD_OCI_CONSTANT(OPCODE_INSERT)
ADD_OCI_CONSTANT(OPCODE_UPDATE)
ADD_OCI_CONSTANT(OPCODE_DELETE)
ADD_OCI_CONSTANT(OPCODE_ALTER)
ADD_OCI_CONSTANT(OPCODE_DROP)
ADD_OCI_CONSTANT(OPCODE_UNKNOWN)
ADD_OCI_CONSTANT(SUBSCR_NAMESPACE_ANONYMOUS)
ADD_OCI_CONSTANT(SUBSCR_NAMESPACE_AQ)
ADD_OCI_CONSTANT(SUBSCR_NAMESPACE_DBCHANGE)
ADD_OCI_CONSTANT(SUBSCR_PROTO_OCI)
ADD_OCI_CONSTANT(SUBSCR_PROTO_MAIL)
ADD_OCI_CONSTANT(SUBSCR_PROTO_SERVER)
ADD_OCI_CONSTANT(SUBSCR_PROTO_HTTP)
#endif
#ifdef ORACLE_11G
ADD_OCI_CONSTANT(ATTR_PURITY_DEFAULT)