Added initial support for database change notification.
This commit is contained in:
parent
e00d8dc231
commit
78d619cfb6
46
Connection.c
46
Connection.c
@ -72,6 +72,7 @@ static int Connection_SetOCIAttr(udt_Connection*, PyObject*, ub4*);
|
||||
static PyObject *Connection_Ping(udt_Connection*, PyObject*);
|
||||
static PyObject *Connection_Shutdown(udt_Connection*, PyObject*, PyObject*);
|
||||
static PyObject *Connection_Startup(udt_Connection*, PyObject*, PyObject*);
|
||||
static PyObject *Connection_Subscribe(udt_Connection*, PyObject*, PyObject*);
|
||||
#endif
|
||||
|
||||
|
||||
@ -96,6 +97,8 @@ static PyMethodDef g_ConnectionMethods[] = {
|
||||
METH_VARARGS | METH_KEYWORDS},
|
||||
{ "startup", (PyCFunction) Connection_Startup,
|
||||
METH_VARARGS | METH_KEYWORDS},
|
||||
{ "subscribe", (PyCFunction) Connection_Subscribe,
|
||||
METH_VARARGS | METH_KEYWORDS},
|
||||
#endif
|
||||
{ "changepassword", (PyCFunction) Connection_ChangePasswordExternal,
|
||||
METH_VARARGS },
|
||||
@ -708,6 +711,9 @@ static int Connection_Connect(
|
||||
|
||||
#include "Cursor.c"
|
||||
#include "Callback.c"
|
||||
#ifdef ORACLE_10GR2
|
||||
#include "Subscription.c"
|
||||
#endif
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
@ -1644,5 +1650,45 @@ static PyObject *Connection_Startup(
|
||||
Py_INCREF(Py_None);
|
||||
return Py_None;
|
||||
}
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Connection_Subscribe()
|
||||
// Create a subscription to events that take place in the database.
|
||||
//-----------------------------------------------------------------------------
|
||||
static PyObject *Connection_Subscribe(
|
||||
udt_Connection *self, // connection
|
||||
PyObject* args, // arguments
|
||||
PyObject* keywordArgs) // keyword arguments
|
||||
{
|
||||
static char *keywordList[] = { "namespace", "protocol", "callback",
|
||||
"timeout", "operations", "rowids", NULL };
|
||||
ub4 namespace, protocol, timeout, rowids, operations;
|
||||
PyObject *rowidsObj, *callback;
|
||||
int temp;
|
||||
|
||||
// parse arguments
|
||||
timeout = rowids = 0;
|
||||
rowidsObj = callback = NULL;
|
||||
namespace = OCI_SUBSCR_NAMESPACE_DBCHANGE;
|
||||
protocol = OCI_SUBSCR_PROTO_OCI;
|
||||
operations = OCI_OPCODE_ALLOPS;
|
||||
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "|iiOiiO", keywordList,
|
||||
&namespace, &protocol, &callback, &timeout, &operations,
|
||||
&rowidsObj))
|
||||
return NULL;
|
||||
|
||||
// set the value for rowids
|
||||
if (rowidsObj) {
|
||||
temp = PyObject_IsTrue(rowidsObj);
|
||||
if (temp < 0)
|
||||
return NULL;
|
||||
if (temp)
|
||||
rowids = 1;
|
||||
}
|
||||
|
||||
return (PyObject*) Subscription_New(self, namespace, protocol, callback,
|
||||
timeout, operations, rowids);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
450
Subscription.c
Normal file
450
Subscription.c
Normal file
@ -0,0 +1,450 @@
|
||||
//-----------------------------------------------------------------------------
|
||||
// Subscription.c
|
||||
// Defines the routines for handling Oracle subscription information.
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// structures used for handling subscriptions
|
||||
//-----------------------------------------------------------------------------
|
||||
typedef struct {
|
||||
PyObject_HEAD
|
||||
OCISubscription *handle;
|
||||
udt_Connection *connection;
|
||||
PyObject *callback;
|
||||
ub4 namespace;
|
||||
ub4 protocol;
|
||||
ub4 timeout;
|
||||
ub4 operations;
|
||||
ub4 rowids;
|
||||
} udt_Subscription;
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Declaration of subscription functions
|
||||
//-----------------------------------------------------------------------------
|
||||
static void Subscription_Free(udt_Subscription*);
|
||||
static PyObject *Subscription_Repr(udt_Subscription*);
|
||||
static PyObject *Subscription_RegisterQuery(udt_Subscription*, PyObject*);
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// declaration of methods for Python type "Subscription"
|
||||
//-----------------------------------------------------------------------------
|
||||
static PyMethodDef g_SubscriptionTypeMethods[] = {
|
||||
{ "registerquery", (PyCFunction) Subscription_RegisterQuery,
|
||||
METH_VARARGS },
|
||||
{ NULL, NULL }
|
||||
};
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Python type declarations
|
||||
//-----------------------------------------------------------------------------
|
||||
static PyTypeObject g_SubscriptionType = {
|
||||
PyVarObject_HEAD_INIT(NULL, 0)
|
||||
"cx_Oracle.Subscription", // tp_name
|
||||
sizeof(udt_Subscription), // tp_basicsize
|
||||
0, // tp_itemsize
|
||||
(destructor) Subscription_Free, // tp_dealloc
|
||||
0, // tp_print
|
||||
0, // tp_getattr
|
||||
0, // tp_setattr
|
||||
0, // tp_compare
|
||||
(reprfunc) Subscription_Repr, // tp_repr
|
||||
0, // tp_as_number
|
||||
0, // tp_as_sequence
|
||||
0, // tp_as_mapping
|
||||
0, // tp_hash
|
||||
0, // tp_call
|
||||
0, // tp_str
|
||||
0, // tp_getattro
|
||||
0, // tp_setattro
|
||||
0, // tp_as_buffer
|
||||
Py_TPFLAGS_DEFAULT, // tp_flags
|
||||
0, // tp_doc
|
||||
0, // tp_traverse
|
||||
0, // tp_clear
|
||||
0, // tp_richcompare
|
||||
0, // tp_weaklistoffset
|
||||
0, // tp_iter
|
||||
0, // tp_iternext
|
||||
g_SubscriptionTypeMethods, // tp_methods
|
||||
0, // tp_members
|
||||
0, // tp_getset
|
||||
0, // tp_base
|
||||
0, // tp_dict
|
||||
0, // tp_descr_get
|
||||
0, // tp_descr_set
|
||||
0, // tp_dictoffset
|
||||
0, // tp_init
|
||||
0, // tp_alloc
|
||||
0, // tp_new
|
||||
0, // tp_free
|
||||
0, // tp_is_gc
|
||||
0 // tp_bases
|
||||
};
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Subscription_CallbackHandler()
|
||||
// Routine that performs the actual call.
|
||||
//-----------------------------------------------------------------------------
|
||||
static int Subscription_CallbackHandler(
|
||||
udt_Subscription *self) // subscription object
|
||||
{
|
||||
PyObject *result, *args;
|
||||
|
||||
// create the arguments for the call
|
||||
printf("creating arguments for call...\n");
|
||||
args = PyTuple_New(0);
|
||||
if (!args)
|
||||
return -1;
|
||||
|
||||
printf("making actual call...\n");
|
||||
// make the actual call
|
||||
result = PyObject_Call(self->callback, args, NULL);
|
||||
printf("Result is %x\n",result);
|
||||
Py_DECREF(args);
|
||||
if (!result)
|
||||
return -1;
|
||||
Py_DECREF(result);
|
||||
|
||||
printf("all looks good here\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Subscription_Callback()
|
||||
// Routine that is called when a callback needs to be invoked.
|
||||
//-----------------------------------------------------------------------------
|
||||
static void Subscription_Callback(
|
||||
udt_Subscription *self, // subscription object
|
||||
OCISubscription *handle, // subscription handle
|
||||
dvoid *payload, // payload
|
||||
ub4 *payloadLength, // payload length
|
||||
dvoid *changeDescriptor, // change descriptor
|
||||
ub4 mode) // mode used
|
||||
{
|
||||
#ifdef WITH_THREAD
|
||||
PyThreadState *threadState;
|
||||
#endif
|
||||
|
||||
printf("Received notification...\n");
|
||||
printf("self is %x\n", self);
|
||||
// create new thread state, if necessary
|
||||
#ifdef WITH_THREAD
|
||||
threadState = PyThreadState_Swap(NULL);
|
||||
if (threadState) {
|
||||
PyThreadState_Swap(threadState);
|
||||
threadState = NULL;
|
||||
} else {
|
||||
printf("creating new thread state....\n");
|
||||
threadState = PyThreadState_New(g_InterpreterState);
|
||||
if (!threadState) {
|
||||
PyErr_Print();
|
||||
return;
|
||||
}
|
||||
PyEval_AcquireThread(threadState);
|
||||
}
|
||||
#endif
|
||||
|
||||
// perform the call
|
||||
printf("performing the call...\n");
|
||||
if (Subscription_CallbackHandler(self) < 0)
|
||||
PyErr_Print();
|
||||
|
||||
// restore thread state, if necessary
|
||||
#ifdef WITH_THREAD
|
||||
if (threadState) {
|
||||
printf("restoring thread state...\n");
|
||||
PyThreadState_Clear(threadState);
|
||||
PyEval_ReleaseThread(threadState);
|
||||
PyThreadState_Delete(threadState);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Subscription_Register()
|
||||
// Register the subscription.
|
||||
//-----------------------------------------------------------------------------
|
||||
static int Subscription_Register(
|
||||
udt_Subscription *self) // subscription to register
|
||||
{
|
||||
udt_Environment *env;
|
||||
sword status;
|
||||
|
||||
printf("creating registration with pointer %x\n", self);
|
||||
// create the subscription handle
|
||||
env = self->connection->environment;
|
||||
status = OCIHandleAlloc(env->handle, (dvoid**) &self->handle,
|
||||
OCI_HTYPE_SUBSCRIPTION, 0, 0);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): allocate handle") < 0)
|
||||
return -1;
|
||||
|
||||
// set the namespace
|
||||
status = OCIAttrSet(self->handle, OCI_HTYPE_SUBSCRIPTION,
|
||||
(dvoid*) &self->namespace, sizeof(ub4), OCI_ATTR_SUBSCR_NAMESPACE,
|
||||
env->errorHandle);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): set namespace") < 0)
|
||||
return -1;
|
||||
|
||||
// set the protocol
|
||||
status = OCIAttrSet(self->handle, OCI_HTYPE_SUBSCRIPTION,
|
||||
(dvoid*) &self->protocol, sizeof(ub4), OCI_ATTR_SUBSCR_RECPTPROTO,
|
||||
env->errorHandle);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): set protocol") < 0)
|
||||
return -1;
|
||||
|
||||
// set the timeout
|
||||
status = OCIAttrSet(self->handle, OCI_HTYPE_SUBSCRIPTION,
|
||||
(dvoid*) &self->timeout, sizeof(ub4), OCI_ATTR_SUBSCR_TIMEOUT,
|
||||
env->errorHandle);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): set timeout") < 0)
|
||||
return -1;
|
||||
|
||||
// set the context for the callback
|
||||
status = OCIAttrSet(self->handle, OCI_HTYPE_SUBSCRIPTION,
|
||||
(dvoid*) self, 0, OCI_ATTR_SUBSCR_CTX, env->errorHandle);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): set context") < 0)
|
||||
return -1;
|
||||
|
||||
// set the callback, if applicable
|
||||
if (self->callback) {
|
||||
status = OCIAttrSet(self->handle, OCI_HTYPE_SUBSCRIPTION,
|
||||
(dvoid*) Subscription_Callback, 0, OCI_ATTR_SUBSCR_CALLBACK,
|
||||
env->errorHandle);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): set callback") < 0)
|
||||
return -1;
|
||||
}
|
||||
|
||||
// set whether or not rowids are desired
|
||||
status = OCIAttrSet(self->handle, OCI_HTYPE_SUBSCRIPTION,
|
||||
(dvoid*) &self->rowids, sizeof(ub4), OCI_ATTR_CHNF_ROWIDS,
|
||||
env->errorHandle);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): set rowids") < 0)
|
||||
return -1;
|
||||
|
||||
// set which operations are desired
|
||||
status = OCIAttrSet(self->handle, OCI_HTYPE_SUBSCRIPTION,
|
||||
(dvoid*) &self->operations, sizeof(ub4), OCI_ATTR_CHNF_OPERATIONS,
|
||||
env->errorHandle);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): set operations") < 0)
|
||||
return -1;
|
||||
|
||||
// register the subscription
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
status = OCISubscriptionRegister(self->connection->handle,
|
||||
&self->handle, 1, env->errorHandle, OCI_DEFAULT);
|
||||
Py_END_ALLOW_THREADS
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_Register(): register") < 0)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Subscription_New()
|
||||
// Allocate a new subscription object.
|
||||
//-----------------------------------------------------------------------------
|
||||
static udt_Subscription *Subscription_New(
|
||||
udt_Connection *connection, // connection object
|
||||
ub4 namespace, // namespace to use
|
||||
ub4 protocol, // protocol to use
|
||||
PyObject *callback, // callback routine
|
||||
ub4 timeout, // timeout (in seconds)
|
||||
ub4 operations, // operations to notify
|
||||
int rowids) // retrieve rowids?
|
||||
{
|
||||
udt_Subscription *self;
|
||||
|
||||
self = (udt_Subscription*)
|
||||
g_SubscriptionType.tp_alloc(&g_SubscriptionType, 0);
|
||||
if (!self)
|
||||
return NULL;
|
||||
Py_INCREF(connection);
|
||||
self->connection = connection;
|
||||
Py_XINCREF(callback);
|
||||
self->callback = callback;
|
||||
self->namespace = namespace;
|
||||
self->protocol = protocol;
|
||||
self->timeout = timeout;
|
||||
self->rowids = rowids;
|
||||
self->operations = operations;
|
||||
self->handle = NULL;
|
||||
if (Subscription_Register(self) < 0) {
|
||||
Py_DECREF(self);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Subscription_Free()
|
||||
// Free the memory associated with an object type.
|
||||
//-----------------------------------------------------------------------------
|
||||
static void Subscription_Free(
|
||||
udt_Subscription *self) // subscription to free
|
||||
{
|
||||
if (self->handle)
|
||||
OCISubscriptionUnRegister(self->connection->handle,
|
||||
self->handle, self->connection->environment->errorHandle,
|
||||
OCI_DEFAULT);
|
||||
Py_CLEAR(self->connection);
|
||||
Py_CLEAR(self->callback);
|
||||
Py_TYPE(self)->tp_free((PyObject*) self);
|
||||
}
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Subscription_Repr()
|
||||
// Return a string representation of the subscription.
|
||||
//-----------------------------------------------------------------------------
|
||||
static PyObject *Subscription_Repr(
|
||||
udt_Subscription *subscription) // subscription to repr
|
||||
{
|
||||
PyObject *connectionRepr, *module, *name, *result, *format, *formatArgs;
|
||||
|
||||
format = cxString_FromAscii("<%s.%s on %s>");
|
||||
if (!format)
|
||||
return NULL;
|
||||
connectionRepr = PyObject_Repr((PyObject*) subscription->connection);
|
||||
if (!connectionRepr) {
|
||||
Py_DECREF(format);
|
||||
return NULL;
|
||||
}
|
||||
if (GetModuleAndName(Py_TYPE(subscription), &module, &name) < 0) {
|
||||
Py_DECREF(format);
|
||||
Py_DECREF(connectionRepr);
|
||||
return NULL;
|
||||
}
|
||||
formatArgs = PyTuple_Pack(3, module, name, connectionRepr);
|
||||
Py_DECREF(module);
|
||||
Py_DECREF(name);
|
||||
Py_DECREF(connectionRepr);
|
||||
if (!formatArgs) {
|
||||
Py_DECREF(format);
|
||||
return NULL;
|
||||
}
|
||||
result = cxString_Format(format, formatArgs);
|
||||
Py_DECREF(format);
|
||||
Py_DECREF(formatArgs);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Subscription_RegisterQuery()
|
||||
// Register a query for database change notification.
|
||||
//-----------------------------------------------------------------------------
|
||||
static PyObject *Subscription_RegisterQuery(
|
||||
udt_Subscription *self, // subscription to use
|
||||
PyObject *args) // arguments
|
||||
{
|
||||
PyObject *statement, *executeArgs;
|
||||
udt_StringBuffer statementBuffer;
|
||||
udt_Environment *env;
|
||||
udt_Cursor *cursor;
|
||||
sword status;
|
||||
|
||||
// parse arguments
|
||||
executeArgs = NULL;
|
||||
if (!PyArg_ParseTuple(args, "O!|O", cxString_Type, &statement,
|
||||
&executeArgs))
|
||||
return NULL;
|
||||
if (executeArgs) {
|
||||
if (!PyDict_Check(executeArgs) && !PySequence_Check(executeArgs)) {
|
||||
PyErr_SetString(PyExc_TypeError,
|
||||
"expecting a dictionary or sequence");
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
// create cursor to perform query
|
||||
env = self->connection->environment;
|
||||
cursor = (udt_Cursor*) Connection_NewCursor(self->connection, NULL);
|
||||
if (!cursor)
|
||||
return NULL;
|
||||
|
||||
// allocate the handle so the subscription handle can be set
|
||||
if (Cursor_AllocateHandle(cursor) < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// prepare the statement for execution
|
||||
if (StringBuffer_Fill(&statementBuffer, statement) < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
status = OCIStmtPrepare(cursor->handle, env->errorHandle,
|
||||
(text*) statementBuffer.ptr, statementBuffer.size, OCI_NTV_SYNTAX,
|
||||
OCI_DEFAULT);
|
||||
StringBuffer_Clear(&statementBuffer);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_RegisterQuery(): prepare statement") < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// perform binds
|
||||
if (executeArgs && Cursor_SetBindVariables(cursor, executeArgs, 1, 0,
|
||||
0) < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
if (Cursor_PerformBind(cursor) < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// parse the query in order to get the defined variables
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
status = OCIStmtExecute(self->connection->handle, cursor->handle,
|
||||
env->errorHandle, 0, 0, 0, 0, OCI_DESCRIBE_ONLY);
|
||||
Py_END_ALLOW_THREADS
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_RegisterQuery(): parse statement") < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// perform define as needed
|
||||
if (Cursor_PerformDefine(cursor) < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// set the subscription handle
|
||||
status = OCIAttrSet(cursor->handle, OCI_HTYPE_STMT, self->handle, 0,
|
||||
OCI_ATTR_CHNF_REGHANDLE, env->errorHandle);
|
||||
if (Environment_CheckForError(env, status,
|
||||
"Subscription_RegisterQuery(): set subscription handle") < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// execute the query which registers it
|
||||
if (Cursor_InternalExecute(cursor, 1) < 0) {
|
||||
Py_DECREF(cursor);
|
||||
return NULL;
|
||||
}
|
||||
Py_DECREF(cursor);
|
||||
|
||||
Py_INCREF(Py_None);
|
||||
return Py_None;
|
||||
}
|
||||
|
||||
@ -342,6 +342,9 @@ static PyObject *Module_Initialize(void)
|
||||
MAKE_TYPE_READY(&g_ObjectAttributeType);
|
||||
MAKE_TYPE_READY(&g_ExternalLobVarType);
|
||||
MAKE_TYPE_READY(&g_ExternalObjectVarType);
|
||||
#ifdef ORACLE_10GR2
|
||||
MAKE_TYPE_READY(&g_SubscriptionType);
|
||||
#endif
|
||||
MAKE_VARIABLE_TYPE_READY(&g_StringVarType);
|
||||
MAKE_VARIABLE_TYPE_READY(&g_FixedCharVarType);
|
||||
MAKE_VARIABLE_TYPE_READY(&g_RowidVarType);
|
||||
|
||||
3
setup.py
3
setup.py
@ -293,7 +293,8 @@ extension = Extension(
|
||||
"ExternalLobVar.c", "ExternalObjectVar.c", "IntervalVar.c",
|
||||
"LobVar.c", "LongVar.c", "NumberVar.c", "ObjectType.c",
|
||||
"ObjectVar.c", "SessionPool.c", "StringUtils.c", "StringVar.c",
|
||||
"TimestampVar.c", "Transforms.c", "Variable.c"])
|
||||
"Subscription.c", "TimestampVar.c", "Transforms.c",
|
||||
"Variable.c"])
|
||||
|
||||
# perform the setup
|
||||
setup(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user