//----------------------------------------------------------------------------- // Copyright 2016-2018, Oracle and/or its affiliates. All rights reserved. // // Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. // // Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta, // Canada. All rights reserved. //----------------------------------------------------------------------------- //----------------------------------------------------------------------------- // cxoSubscr.c // Defines the routines for handling Oracle subscription information. //----------------------------------------------------------------------------- #include "cxoModule.h" //----------------------------------------------------------------------------- // Declaration of subscription functions //----------------------------------------------------------------------------- static void cxoSubscr_free(cxoSubscr*); static PyObject *cxoSubscr_repr(cxoSubscr*); static PyObject *cxoSubscr_registerQuery(cxoSubscr*, PyObject*); static void cxoMessage_free(cxoMessage*); static void cxoMessageTable_free(cxoMessageTable*); static void cxoMessageRow_free(cxoMessageRow*); static void cxoMessageQuery_free(cxoMessageQuery*); //----------------------------------------------------------------------------- // declaration of members for Python types //----------------------------------------------------------------------------- static PyMemberDef cxoSubscrTypeMembers[] = { { "callback", T_OBJECT, offsetof(cxoSubscr, callback), READONLY }, { "connection", T_OBJECT, offsetof(cxoSubscr, connection), READONLY }, { "namespace", T_INT, offsetof(cxoSubscr, namespace), READONLY }, { "protocol", T_INT, offsetof(cxoSubscr, protocol), READONLY }, { "port", T_INT, offsetof(cxoSubscr, port), READONLY }, { "timeout", T_INT, offsetof(cxoSubscr, timeout), READONLY }, { "operations", T_INT, offsetof(cxoSubscr, operations), READONLY }, { "qos", T_INT, offsetof(cxoSubscr, qos), READONLY }, { "id", T_ULONG, offsetof(cxoSubscr, id), READONLY }, { NULL } }; static PyMemberDef cxoMessageTypeMembers[] = { { "subscription", T_OBJECT, offsetof(cxoMessage, subscription), READONLY }, { "type", T_INT, offsetof(cxoMessage, type), READONLY }, { "dbname", T_OBJECT, offsetof(cxoMessage, dbname), READONLY }, { "txid", T_OBJECT, offsetof(cxoMessage, txId), READONLY }, { "tables", T_OBJECT, offsetof(cxoMessage, tables), READONLY }, { "queries", T_OBJECT, offsetof(cxoMessage, queries), READONLY }, { NULL } }; static PyMemberDef cxoMessageTableTypeMembers[] = { { "name", T_OBJECT, offsetof(cxoMessageTable, name), READONLY }, { "rows", T_OBJECT, offsetof(cxoMessageTable, rows), READONLY }, { "operation", T_INT, offsetof(cxoMessageTable, operation), READONLY }, { NULL } }; static PyMemberDef cxoMessageRowTypeMembers[] = { { "rowid", T_OBJECT, offsetof(cxoMessageRow, rowid), READONLY }, { "operation", T_INT, offsetof(cxoMessageRow, operation), READONLY }, { NULL } }; static PyMemberDef cxoMessageQueryTypeMembers[] = { { "id", T_INT, offsetof(cxoMessageQuery, id), READONLY }, { "operation", T_INT, offsetof(cxoMessageQuery, operation), READONLY }, { "tables", T_OBJECT, offsetof(cxoMessageQuery, tables), READONLY }, { NULL } }; //----------------------------------------------------------------------------- // declaration of methods for Python types //----------------------------------------------------------------------------- static PyMethodDef cxoSubscrTypeMethods[] = { { "registerquery", (PyCFunction) cxoSubscr_registerQuery, METH_VARARGS }, { NULL, NULL } }; //----------------------------------------------------------------------------- // Python type declarations //----------------------------------------------------------------------------- PyTypeObject cxoPyTypeSubscr = { PyVarObject_HEAD_INIT(NULL, 0) "cx_Oracle.Subscription", // tp_name sizeof(cxoSubscr), // tp_basicsize 0, // tp_itemsize (destructor) cxoSubscr_free, // tp_dealloc 0, // tp_print 0, // tp_getattr 0, // tp_setattr 0, // tp_compare (reprfunc) cxoSubscr_repr, // tp_repr 0, // tp_as_number 0, // tp_as_sequence 0, // tp_as_mapping 0, // tp_hash 0, // tp_call 0, // tp_str 0, // tp_getattro 0, // tp_setattro 0, // tp_as_buffer Py_TPFLAGS_DEFAULT, // tp_flags 0, // tp_doc 0, // tp_traverse 0, // tp_clear 0, // tp_richcompare 0, // tp_weaklistoffset 0, // tp_iter 0, // tp_iternext cxoSubscrTypeMethods, // tp_methods cxoSubscrTypeMembers, // tp_members 0, // tp_getset 0, // tp_base 0, // tp_dict 0, // tp_descr_get 0, // tp_descr_set 0, // tp_dictoffset 0, // tp_init 0, // tp_alloc 0, // tp_new 0, // tp_free 0, // tp_is_gc 0 // tp_bases }; PyTypeObject cxoPyTypeMessage = { PyVarObject_HEAD_INIT(NULL, 0) "cx_Oracle.Message", // tp_name sizeof(cxoMessage), // tp_basicsize 0, // tp_itemsize (destructor) cxoMessage_free, // tp_dealloc 0, // tp_print 0, // tp_getattr 0, // tp_setattr 0, // tp_compare 0, // tp_repr 0, // tp_as_number 0, // tp_as_sequence 0, // tp_as_mapping 0, // tp_hash 0, // tp_call 0, // tp_str 0, // tp_getattro 0, // tp_setattro 0, // tp_as_buffer Py_TPFLAGS_DEFAULT, // tp_flags 0, // tp_doc 0, // tp_traverse 0, // tp_clear 0, // tp_richcompare 0, // tp_weaklistoffset 0, // tp_iter 0, // tp_iternext 0, // tp_methods cxoMessageTypeMembers, // tp_members 0, // tp_getset 0, // tp_base 0, // tp_dict 0, // tp_descr_get 0, // tp_descr_set 0, // tp_dictoffset 0, // tp_init 0, // tp_alloc 0, // tp_new 0, // tp_free 0, // tp_is_gc 0 // tp_bases }; PyTypeObject cxoPyTypeMessageTable = { PyVarObject_HEAD_INIT(NULL, 0) "cx_Oracle.MessageTable", // tp_name sizeof(cxoMessageTable), // tp_basicsize 0, // tp_itemsize (destructor) cxoMessageTable_free, // tp_dealloc 0, // tp_print 0, // tp_getattr 0, // tp_setattr 0, // tp_compare 0, // tp_repr 0, // tp_as_number 0, // tp_as_sequence 0, // tp_as_mapping 0, // tp_hash 0, // tp_call 0, // tp_str 0, // tp_getattro 0, // tp_setattro 0, // tp_as_buffer Py_TPFLAGS_DEFAULT, // tp_flags 0, // tp_doc 0, // tp_traverse 0, // tp_clear 0, // tp_richcompare 0, // tp_weaklistoffset 0, // tp_iter 0, // tp_iternext 0, // tp_methods cxoMessageTableTypeMembers, // tp_members 0, // tp_getset 0, // tp_base 0, // tp_dict 0, // tp_descr_get 0, // tp_descr_set 0, // tp_dictoffset 0, // tp_init 0, // tp_alloc 0, // tp_new 0, // tp_free 0, // tp_is_gc 0 // tp_bases }; PyTypeObject cxoPyTypeMessageRow = { PyVarObject_HEAD_INIT(NULL, 0) "cx_Oracle.MessageRow", // tp_name sizeof(cxoMessageRow), // tp_basicsize 0, // tp_itemsize (destructor) cxoMessageRow_free, // tp_dealloc 0, // tp_print 0, // tp_getattr 0, // tp_setattr 0, // tp_compare 0, // tp_repr 0, // tp_as_number 0, // tp_as_sequence 0, // tp_as_mapping 0, // tp_hash 0, // tp_call 0, // tp_str 0, // tp_getattro 0, // tp_setattro 0, // tp_as_buffer Py_TPFLAGS_DEFAULT, // tp_flags 0, // tp_doc 0, // tp_traverse 0, // tp_clear 0, // tp_richcompare 0, // tp_weaklistoffset 0, // tp_iter 0, // tp_iternext 0, // tp_methods cxoMessageRowTypeMembers, // tp_members 0, // tp_getset 0, // tp_base 0, // tp_dict 0, // tp_descr_get 0, // tp_descr_set 0, // tp_dictoffset 0, // tp_init 0, // tp_alloc 0, // tp_new 0, // tp_free 0, // tp_is_gc 0 // tp_bases }; PyTypeObject cxoPyTypeMessageQuery = { PyVarObject_HEAD_INIT(NULL, 0) "cx_Oracle.MessageQuery", // tp_name sizeof(cxoMessageQuery), // tp_basicsize 0, // tp_itemsize (destructor) cxoMessageQuery_free, // tp_dealloc 0, // tp_print 0, // tp_getattr 0, // tp_setattr 0, // tp_compare 0, // tp_repr 0, // tp_as_number 0, // tp_as_sequence 0, // tp_as_mapping 0, // tp_hash 0, // tp_call 0, // tp_str 0, // tp_getattro 0, // tp_setattro 0, // tp_as_buffer Py_TPFLAGS_DEFAULT, // tp_flags 0, // tp_doc 0, // tp_traverse 0, // tp_clear 0, // tp_richcompare 0, // tp_weaklistoffset 0, // tp_iter 0, // tp_iternext 0, // tp_methods cxoMessageQueryTypeMembers, // tp_members 0, // tp_getset 0, // tp_base 0, // tp_dict 0, // tp_descr_get 0, // tp_descr_set 0, // tp_dictoffset 0, // tp_init 0, // tp_alloc 0, // tp_new 0, // tp_free 0, // tp_is_gc 0 // tp_bases }; //----------------------------------------------------------------------------- // cxoMessageRow_initialize() // Initialize a new message row with the information from the descriptor. //----------------------------------------------------------------------------- static int cxoMessageRow_initialize(cxoMessageRow *rowObj, const char *encoding, dpiSubscrMessageRow *row) { rowObj->operation = row->operation; rowObj->rowid = cxoPyString_fromEncodedString(row->rowid, row->rowidLength, encoding); if (!rowObj->rowid) return -1; return 0; } //----------------------------------------------------------------------------- // cxoMessageTable_initialize() // Initialize a new message table with the information from the descriptor. //----------------------------------------------------------------------------- static int cxoMessageTable_initialize(cxoMessageTable *tableObj, const char *encoding, dpiSubscrMessageTable *table) { cxoMessageRow *row; uint32_t i; tableObj->operation = table->operation; tableObj->name = cxoPyString_fromEncodedString(table->name, table->nameLength, encoding); tableObj->rows = PyList_New(table->numRows); if (!tableObj->rows) return -1; for (i = 0; i < table->numRows; i++) { row = (cxoMessageRow*) cxoPyTypeMessageRow.tp_alloc(&cxoPyTypeMessageRow, 0); if (!row) return -1; PyList_SET_ITEM(tableObj->rows, i, (PyObject*) row); if (cxoMessageRow_initialize(row, encoding, &table->rows[i]) < 0) return -1; } return 0; } //----------------------------------------------------------------------------- // cxoMessageQuery_initialize() // Initialize a new message query with the information from the descriptor. //----------------------------------------------------------------------------- static int cxoMessageQuery_initialize(cxoMessageQuery *queryObj, const char *encoding, dpiSubscrMessageQuery *query) { cxoMessageTable *table; uint32_t i; queryObj->id = query->id; queryObj->operation = query->operation; queryObj->tables = PyList_New(query->numTables); if (!queryObj->tables) return -1; for (i = 0; i < query->numTables; i++) { table = (cxoMessageTable*) cxoPyTypeMessageTable.tp_alloc(&cxoPyTypeMessageTable, 0); if (!table) return -1; PyList_SET_ITEM(queryObj->tables, i, (PyObject*) table); if (cxoMessageTable_initialize(table, encoding, &query->tables[i]) < 0) return -1; } return 0; } //----------------------------------------------------------------------------- // cxoMessage_initialize() // Initialize a new message with the information from the descriptor. //----------------------------------------------------------------------------- static int cxoMessage_initialize(cxoMessage *messageObj, cxoSubscr *subscription, dpiSubscrMessage *message) { cxoMessageTable *table; cxoMessageQuery *query; const char *encoding; uint32_t i; Py_INCREF(subscription); messageObj->subscription = subscription; encoding = subscription->connection->encodingInfo.encoding; messageObj->type = message->eventType; messageObj->dbname = cxoPyString_fromEncodedString(message->dbName, message->dbNameLength, encoding); if (!messageObj->dbname) return -1; if (message->txId) { messageObj->txId = PyBytes_FromStringAndSize(message->txId, message->txIdLength); if (!messageObj->txId) return -1; } switch (message->eventType) { case DPI_EVENT_OBJCHANGE: messageObj->tables = PyList_New(message->numTables); if (!messageObj->tables) return -1; for (i = 0; i < message->numTables; i++) { table = (cxoMessageTable*) cxoPyTypeMessageTable.tp_alloc(&cxoPyTypeMessageTable, 0); if (!table) return -1; PyList_SET_ITEM(messageObj->tables, i, (PyObject*) table); if (cxoMessageTable_initialize(table, encoding, &message->tables[i]) < 0) return -1; } break; case DPI_EVENT_QUERYCHANGE: messageObj->queries = PyList_New(message->numQueries); if (!messageObj->queries) return -1; for (i = 0; i < message->numQueries; i++) { query = (cxoMessageQuery*) cxoPyTypeMessageQuery.tp_alloc(&cxoPyTypeMessageQuery, 0); if (!query) return -1; PyList_SET_ITEM(messageObj->queries, i, (PyObject*) query); if (cxoMessageQuery_initialize(query, encoding, &message->queries[i]) < 0) return -1; } break; default: break; } return 0; } //----------------------------------------------------------------------------- // cxoSubscr_callbackHandler() // Routine that performs the actual call. //----------------------------------------------------------------------------- static int cxoSubscr_callbackHandler(cxoSubscr *subscr, dpiSubscrMessage *message) { PyObject *result, *args; cxoMessage *messageObj; // create the message messageObj = (cxoMessage*) cxoPyTypeMessage.tp_alloc(&cxoPyTypeMessage, 0); if (!messageObj) return -1; if (cxoMessage_initialize(messageObj, subscr, message) < 0) { Py_DECREF(messageObj); return -1; } // create the arguments for the call args = PyTuple_Pack(1, messageObj); Py_DECREF(messageObj); if (!args) return -1; // make the actual call result = PyObject_Call(subscr->callback, args, NULL); Py_DECREF(args); if (!result) return -1; Py_DECREF(result); return 0; } //----------------------------------------------------------------------------- // cxoSubscr_callback() // Routine that is called when a callback needs to be invoked. //----------------------------------------------------------------------------- static void cxoSubscr_callback(cxoSubscr *subscr, dpiSubscrMessage *message) { #ifdef WITH_THREAD PyGILState_STATE gstate = PyGILState_Ensure(); #endif if (message->errorInfo) { cxoError_raiseFromInfo(message->errorInfo); PyErr_Print(); } else if (cxoSubscr_callbackHandler(subscr, message) < 0) PyErr_Print(); #ifdef WITH_THREAD PyGILState_Release(gstate); #endif } //----------------------------------------------------------------------------- // cxoSubscr_new() // Allocate a new subscription object. //----------------------------------------------------------------------------- cxoSubscr *cxoSubscr_new(cxoConnection *connection, uint32_t namespace, uint32_t protocol, uint32_t port, PyObject *callback, uint32_t timeout, uint32_t operations, uint32_t qos) { dpiSubscrCreateParams params; cxoSubscr *subscr; subscr = (cxoSubscr*) cxoPyTypeSubscr.tp_alloc(&cxoPyTypeSubscr, 0); if (!subscr) return NULL; Py_INCREF(connection); subscr->connection = connection; Py_XINCREF(callback); subscr->callback = callback; subscr->namespace = namespace; subscr->protocol = protocol; subscr->port = port; subscr->timeout = timeout; subscr->operations = operations; subscr->qos = qos; if (dpiContext_initSubscrCreateParams(cxoDpiContext, ¶ms) < 0) { cxoError_raiseAndReturnNull(); return NULL; } params.subscrNamespace = namespace; params.protocol = protocol; params.portNumber = port; if (callback) { params.callback = (dpiSubscrCallback) cxoSubscr_callback; params.callbackContext = subscr; } params.timeout = timeout; params.operations = operations; params.qos = qos; if (dpiConn_newSubscription(connection->handle, ¶ms, &subscr->handle, &subscr->id) < 0) { cxoError_raiseAndReturnNull(); Py_DECREF(subscr); return NULL; } return subscr; } //----------------------------------------------------------------------------- // cxoSubscr_free() // Free the memory associated with a subscription. //----------------------------------------------------------------------------- static void cxoSubscr_free(cxoSubscr *subscr) { if (subscr->handle) { dpiSubscr_release(subscr->handle); subscr->handle = NULL; } Py_CLEAR(subscr->connection); Py_CLEAR(subscr->callback); Py_TYPE(subscr)->tp_free((PyObject*) subscr); } //----------------------------------------------------------------------------- // cxoSubscr_repr() // Return a string representation of the subscription. //----------------------------------------------------------------------------- static PyObject *cxoSubscr_repr(cxoSubscr *subscription) { PyObject *connectionRepr, *module, *name, *result; connectionRepr = PyObject_Repr((PyObject*) subscription->connection); if (!connectionRepr) return NULL; if (cxoUtils_getModuleAndName(Py_TYPE(subscription), &module, &name) < 0) { Py_DECREF(connectionRepr); return NULL; } result = cxoUtils_formatString("<%s.%s on %s>", PyTuple_Pack(3, module, name, connectionRepr)); Py_DECREF(module); Py_DECREF(name); Py_DECREF(connectionRepr); return result; } //----------------------------------------------------------------------------- // cxoSubscr_registerQuery() // Register a query for database change notification. //----------------------------------------------------------------------------- static PyObject *cxoSubscr_registerQuery(cxoSubscr *subscr, PyObject *args) { PyObject *statement, *executeArgs; cxoBuffer statementBuffer; uint32_t numQueryColumns; cxoCursor *cursor; uint64_t queryId; int status; // parse arguments executeArgs = NULL; if (!PyArg_ParseTuple(args, "O|O", &statement, &executeArgs)) return NULL; if (executeArgs) { if (!PyDict_Check(executeArgs) && !PySequence_Check(executeArgs)) { PyErr_SetString(PyExc_TypeError, "expecting a dictionary or sequence"); return NULL; } } // create cursor to perform query cursor = (cxoCursor*) PyObject_CallMethod((PyObject*) subscr->connection, "cursor", NULL); if (!cursor) return NULL; // prepare the statement for execution if (cxoBuffer_fromObject(&statementBuffer, statement, subscr->connection->encodingInfo.encoding) < 0) { Py_DECREF(cursor); return NULL; } status = dpiSubscr_prepareStmt(subscr->handle, statementBuffer.ptr, statementBuffer.size, &cursor->handle); cxoBuffer_clear(&statementBuffer); if (status < 0) { cxoError_raiseAndReturnNull(); Py_DECREF(cursor); return NULL; } // perform binds if (executeArgs && cxoCursor_setBindVariables(cursor, executeArgs, 1, 0, 0) < 0) { Py_DECREF(cursor); return NULL; } if (cxoCursor_performBind(cursor) < 0) { Py_DECREF(cursor); return NULL; } // perform the execute (which registers the query) Py_BEGIN_ALLOW_THREADS status = dpiStmt_execute(cursor->handle, DPI_MODE_EXEC_DEFAULT, &numQueryColumns); Py_END_ALLOW_THREADS if (status < 0) { cxoError_raiseAndReturnNull(); Py_DECREF(cursor); return NULL; } // return the query id, if applicable if (subscr->qos & DPI_SUBSCR_QOS_QUERY) { if (dpiStmt_getSubscrQueryId(cursor->handle, &queryId) < 0) { cxoError_raiseAndReturnNull(); Py_DECREF(cursor); return NULL; } Py_DECREF(cursor); return PyInt_FromLong((long) queryId); } Py_DECREF(cursor); Py_RETURN_NONE; } //----------------------------------------------------------------------------- // cxoMessage_free() // Free the memory associated with a message. //----------------------------------------------------------------------------- static void cxoMessage_free(cxoMessage *message) { Py_CLEAR(message->subscription); Py_CLEAR(message->dbname); Py_CLEAR(message->tables); Py_CLEAR(message->queries); Py_TYPE(message)->tp_free((PyObject*) message); } //----------------------------------------------------------------------------- // cxoMessageTable_free() // Free the memory associated with a table in a message. //----------------------------------------------------------------------------- static void cxoMessageTable_free(cxoMessageTable *table) { Py_CLEAR(table->name); Py_CLEAR(table->rows); Py_TYPE(table)->tp_free((PyObject*) table); } //----------------------------------------------------------------------------- // cxoMessageRow_free() // Free the memory associated with a row in a message. //----------------------------------------------------------------------------- static void cxoMessageRow_free(cxoMessageRow *row) { Py_CLEAR(row->rowid); Py_TYPE(row)->tp_free((PyObject*) row); } //----------------------------------------------------------------------------- // cxoMessageQuery_free() // Free the memory associated with a query in a message. //----------------------------------------------------------------------------- static void cxoMessageQuery_free(cxoMessageQuery *query) { Py_CLEAR(query->tables); Py_TYPE(query)->tp_free((PyObject*) query); }