Added support for SODA bulk insert available in Oracle Client 18.5 and higher.

This commit is contained in:
Anthony Tuininga 2019-03-21 14:59:14 -06:00
parent 43ebe8afed
commit e567eb3a58
6 changed files with 257 additions and 47 deletions

View File

@ -181,6 +181,33 @@ SODA Collection Object
.. versionadded:: 7.0 .. versionadded:: 7.0
.. method:: SodaCollection.insertMany(docs)
Inserts a list of documents into the collection at one time. Each of the
input documents can be a dictionary or list or an existing :ref:`SODA
document object <sodadoc>`.
.. note::
This method requires Oracle Client 18.5 and higher.
.. versionadded:: 7.2
.. method:: SodaCollection.insertManyAndGet(docs)
Similarly to :meth:`~SodaCollection.insertMany()` this method inserts a
list of documents into the collection at one time. The only difference is
that it returns a list of :ref:`SODA Document objects <sodadoc>`. Note that
for performance reasons the returned documents do not contain the content.
.. note::
This method requires Oracle Client 18.5 and higher.
.. versionadded:: 7.2
.. method:: SodaCollection.insertOne(doc) .. method:: SodaCollection.insertOne(doc)
Inserts a given document into the collection. The input document can be a Inserts a given document into the collection. The input document can be a

53
samples/SodaBulkInsert.py Normal file
View File

@ -0,0 +1,53 @@
#------------------------------------------------------------------------------
# Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# SodaBulkInsert.py
# Demonstrates the use of SODA bulk insert.
#
# This script requires cx_Oracle 7.2 and higher.
# Oracle Client must be at 18.5 or higher.
# Oracle Database must be at 18.1 or higher.
# The user must have been granted the SODA_APP privilege.
#------------------------------------------------------------------------------
from __future__ import print_function
import cx_Oracle
import SampleEnv
connection = cx_Oracle.connect(SampleEnv.GetMainConnectString())
# the general recommendation for simple SODA usage is to enable autocommit
connection.autocommit = True
# create the parent object for all SODA work
soda = connection.getSodaDatabase()
# create a new (or open an existing) SODA collection
collection = soda.createCollection("SodaBulkInsert")
# remove all documents from the collection
collection.find().remove()
# define some documents that will be stored
inDocs = [
dict(name="Sam", age=8),
dict(name="George", age=46),
dict(name="Bill", age=35),
dict(name="Sally", age=43),
dict(name="Jill", age=28),
dict(name="Cynthia", age=12)
]
# perform bulk insert
resultDocs = collection.insertManyAndGet(inDocs)
for doc in resultDocs:
print("Inserted SODA document with key", doc.key)
print()
# perform search of all persons under the age of 40
print("Persons under the age of 40:")
for doc in collection.find().filter({'age': {'$lt': 40}}).getDocuments():
print(doc.getContent()["name"] + ",", "key", doc.key)

View File

@ -526,7 +526,7 @@ int cxoUtils_getModuleAndName(PyTypeObject *type, PyObject **module,
int cxoUtils_initializeDPI(void); int cxoUtils_initializeDPI(void);
int cxoUtils_processJsonArg(PyObject *arg, cxoBuffer *buffer); int cxoUtils_processJsonArg(PyObject *arg, cxoBuffer *buffer);
int cxoUtils_processSodaDocArg(cxoSodaDatabase *db, PyObject *arg, int cxoUtils_processSodaDocArg(cxoSodaDatabase *db, PyObject *arg,
cxoSodaDoc **doc); dpiSodaDoc **handle);
cxoVarType *cxoVarType_fromDataTypeInfo(dpiDataTypeInfo *info); cxoVarType *cxoVarType_fromDataTypeInfo(dpiDataTypeInfo *info);
cxoVarType *cxoVarType_fromPythonType(PyObject *type, cxoObjectType **objType); cxoVarType *cxoVarType_fromPythonType(PyObject *type, cxoObjectType **objType);

View File

@ -20,6 +20,12 @@ static PyObject *cxoSodaCollection_dropIndex(cxoSodaCollection*, PyObject*,
PyObject*); PyObject*);
static PyObject *cxoSodaCollection_find(cxoSodaCollection*, PyObject*); static PyObject *cxoSodaCollection_find(cxoSodaCollection*, PyObject*);
static PyObject *cxoSodaCollection_getDataGuide(cxoSodaCollection*, PyObject*); static PyObject *cxoSodaCollection_getDataGuide(cxoSodaCollection*, PyObject*);
static PyObject *cxoSodaCollection_insertMany(cxoSodaCollection*, PyObject*);
static PyObject *cxoSodaCollection_insertManyAndGet(cxoSodaCollection*,
PyObject*);
static PyObject *cxoSodaCollection_insertManyHelper(cxoSodaCollection *coll,
PyObject *docs, Py_ssize_t numDocs, dpiSodaDoc **handles,
dpiSodaDoc **returnHandles);
static PyObject *cxoSodaCollection_insertOne(cxoSodaCollection*, PyObject*); static PyObject *cxoSodaCollection_insertOne(cxoSodaCollection*, PyObject*);
static PyObject *cxoSodaCollection_insertOneAndGet(cxoSodaCollection*, static PyObject *cxoSodaCollection_insertOneAndGet(cxoSodaCollection*,
PyObject*); PyObject*);
@ -40,6 +46,9 @@ static PyMethodDef cxoMethods[] = {
{ "insertOne", (PyCFunction) cxoSodaCollection_insertOne, METH_O }, { "insertOne", (PyCFunction) cxoSodaCollection_insertOne, METH_O },
{ "insertOneAndGet", (PyCFunction) cxoSodaCollection_insertOneAndGet, { "insertOneAndGet", (PyCFunction) cxoSodaCollection_insertOneAndGet,
METH_O }, METH_O },
{ "insertMany", (PyCFunction) cxoSodaCollection_insertMany, METH_O },
{ "insertManyAndGet", (PyCFunction) cxoSodaCollection_insertManyAndGet,
METH_O },
{ NULL } { NULL }
}; };
@ -323,6 +332,135 @@ static PyObject *cxoSodaCollection_getDataGuide(cxoSodaCollection *coll,
} }
//-----------------------------------------------------------------------------
// cxoSodaCollection_insertMany()
// Inserts multilple document into the collection at one time.
//-----------------------------------------------------------------------------
static PyObject *cxoSodaCollection_insertMany(cxoSodaCollection *coll,
PyObject *arg)
{
dpiSodaDoc **handles;
Py_ssize_t numDocs;
PyObject *result;
if (!PyList_Check(arg)) {
PyErr_SetString(PyExc_TypeError, "expecting list");
return NULL;
}
numDocs = PyList_GET_SIZE(arg);
handles = PyMem_Malloc(numDocs * sizeof(dpiSodaDoc*));
if (!handles) {
PyErr_NoMemory();
return NULL;
}
result = cxoSodaCollection_insertManyHelper(coll, arg, numDocs, handles,
NULL);
PyMem_Free(handles);
return result;
}
//-----------------------------------------------------------------------------
// cxoSodaCollection_insertManyAndGet()
// Inserts multiple documents into the collection at one time and return a
// list of documents containing all but the content itself.
//-----------------------------------------------------------------------------
static PyObject *cxoSodaCollection_insertManyAndGet(cxoSodaCollection *coll,
PyObject *arg)
{
dpiSodaDoc **handles, **returnHandles;
Py_ssize_t numDocs;
PyObject *result;
if (!PyList_Check(arg)) {
PyErr_SetString(PyExc_TypeError, "expecting list");
return NULL;
}
numDocs = PyList_GET_SIZE(arg);
handles = PyMem_Malloc(numDocs * sizeof(dpiSodaDoc*));
if (!handles) {
PyErr_NoMemory();
return NULL;
}
returnHandles = PyMem_Malloc(numDocs * sizeof(dpiSodaDoc*));
if (!returnHandles) {
PyErr_NoMemory();
PyMem_Free(handles);
return NULL;
}
result = cxoSodaCollection_insertManyHelper(coll, arg, numDocs, handles,
returnHandles);
PyMem_Free(handles);
PyMem_Free(returnHandles);
return result;
}
//-----------------------------------------------------------------------------
// cxoSodaCollection_insertManyHelper()
// Helper method to perform bulk insert of SODA documents into a collection.
//-----------------------------------------------------------------------------
static PyObject *cxoSodaCollection_insertManyHelper(cxoSodaCollection *coll,
PyObject *docs, Py_ssize_t numDocs, dpiSodaDoc **handles,
dpiSodaDoc **returnHandles)
{
PyObject *element, *returnDocs;
Py_ssize_t i, j;
cxoSodaDoc *doc;
uint32_t flags;
int status;
// determine flags to use
if (cxoConnection_getSodaFlags(coll->db->connection, &flags) < 0)
return NULL;
// populate array of document handles
for (i = 0; i < numDocs; i++) {
element = PyList_GET_ITEM(docs, i);
if (cxoUtils_processSodaDocArg(coll->db, element, &handles[i]) < 0) {
for (j = 0; j < i; j++)
dpiSodaDoc_release(handles[j]);
return NULL;
}
}
// perform bulk insert
Py_BEGIN_ALLOW_THREADS
status = dpiSodaColl_insertMany(coll->handle, (uint32_t) numDocs, handles,
flags, returnHandles);
Py_END_ALLOW_THREADS
if (status < 0)
cxoError_raiseAndReturnNull();
for (i = 0; i < numDocs; i++)
dpiSodaDoc_release(handles[i]);
if (status < 0)
return NULL;
// if no documents are to be returned, None is returned
if (!returnHandles)
Py_RETURN_NONE;
// otherwise, return list of documents
returnDocs = PyList_New(numDocs);
if (!returnDocs) {
for (i = 0; i < numDocs; i++)
dpiSodaDoc_release(returnHandles[i]);
return NULL;
}
for (i = 0; i < numDocs; i++) {
doc = cxoSodaDoc_new(coll->db, returnHandles[i]);
if (!doc) {
for (j = i; j < numDocs; j++)
dpiSodaDoc_release(returnHandles[j]);
Py_DECREF(returnDocs);
return NULL;
}
PyList_SET_ITEM(returnDocs, i, (PyObject*) doc);
}
return returnDocs;
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// cxoSodaCollection_insertOne() // cxoSodaCollection_insertOne()
// Insert a single document into the collection. // Insert a single document into the collection.
@ -330,23 +468,22 @@ static PyObject *cxoSodaCollection_getDataGuide(cxoSodaCollection *coll,
static PyObject *cxoSodaCollection_insertOne(cxoSodaCollection *coll, static PyObject *cxoSodaCollection_insertOne(cxoSodaCollection *coll,
PyObject *arg) PyObject *arg)
{ {
cxoSodaDoc *doc; dpiSodaDoc *handle;
uint32_t flags; uint32_t flags;
int status; int status;
if (cxoUtils_processSodaDocArg(coll->db, arg, &doc) < 0) if (cxoUtils_processSodaDocArg(coll->db, arg, &handle) < 0)
return NULL; return NULL;
if (cxoConnection_getSodaFlags(coll->db->connection, &flags) < 0) if (cxoConnection_getSodaFlags(coll->db->connection, &flags) < 0)
return NULL; return NULL;
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
status = dpiSodaColl_insertOne(coll->handle, doc->handle, flags, NULL); status = dpiSodaColl_insertOne(coll->handle, handle, flags, NULL);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if (status < 0) { if (status < 0)
cxoError_raiseAndReturnNull(); cxoError_raiseAndReturnNull();
Py_DECREF(doc); dpiSodaDoc_release(handle);
if (status < 0)
return NULL; return NULL;
}
Py_DECREF(doc);
Py_RETURN_NONE; Py_RETURN_NONE;
} }
@ -359,26 +496,24 @@ static PyObject *cxoSodaCollection_insertOne(cxoSodaCollection *coll,
static PyObject *cxoSodaCollection_insertOneAndGet(cxoSodaCollection *coll, static PyObject *cxoSodaCollection_insertOneAndGet(cxoSodaCollection *coll,
PyObject *arg) PyObject *arg)
{ {
dpiSodaDoc *returnedDoc; dpiSodaDoc *handle, *returnedHandle;
cxoSodaDoc *doc;
uint32_t flags; uint32_t flags;
int status; int status;
if (cxoUtils_processSodaDocArg(coll->db, arg, &doc) < 0) if (cxoUtils_processSodaDocArg(coll->db, arg, &handle) < 0)
return NULL; return NULL;
if (cxoConnection_getSodaFlags(coll->db->connection, &flags) < 0) if (cxoConnection_getSodaFlags(coll->db->connection, &flags) < 0)
return NULL; return NULL;
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
status = dpiSodaColl_insertOne(coll->handle, doc->handle, flags, status = dpiSodaColl_insertOne(coll->handle, handle, flags,
&returnedDoc); &returnedHandle);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if (status < 0) { if (status < 0)
cxoError_raiseAndReturnNull(); cxoError_raiseAndReturnNull();
Py_DECREF(doc); dpiSodaDoc_release(handle);
if (status < 0)
return NULL; return NULL;
} return (PyObject*) cxoSodaDoc_new(coll->db, returnedHandle);
Py_DECREF(doc);
return (PyObject*) cxoSodaDoc_new(coll->db, returnedDoc);
} }

View File

@ -507,23 +507,22 @@ static PyObject *cxoSodaOperation_replaceOne(cxoSodaOperation *op,
PyObject *arg) PyObject *arg)
{ {
int status, replaced; int status, replaced;
cxoSodaDoc *doc; dpiSodaDoc *handle;
uint32_t flags; uint32_t flags;
if (cxoConnection_getSodaFlags(op->coll->db->connection, &flags) < 0) if (cxoConnection_getSodaFlags(op->coll->db->connection, &flags) < 0)
return NULL; return NULL;
if (cxoUtils_processSodaDocArg(op->coll->db, arg, &doc) < 0) if (cxoUtils_processSodaDocArg(op->coll->db, arg, &handle) < 0)
return NULL; return NULL;
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
status = dpiSodaColl_replaceOne(op->coll->handle, &op->options, status = dpiSodaColl_replaceOne(op->coll->handle, &op->options,
doc->handle, flags, &replaced, NULL); handle, flags, &replaced, NULL);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if (status < 0) { if (status < 0)
cxoError_raiseAndReturnNull(); cxoError_raiseAndReturnNull();
Py_DECREF(doc); dpiSodaDoc_release(handle);
if (status < 0)
return NULL; return NULL;
}
Py_DECREF(doc);
if (replaced) if (replaced)
Py_RETURN_TRUE; Py_RETURN_TRUE;
Py_RETURN_FALSE; Py_RETURN_FALSE;
@ -538,27 +537,25 @@ static PyObject *cxoSodaOperation_replaceOne(cxoSodaOperation *op,
static PyObject *cxoSodaOperation_replaceOneAndGet(cxoSodaOperation *op, static PyObject *cxoSodaOperation_replaceOneAndGet(cxoSodaOperation *op,
PyObject *arg) PyObject *arg)
{ {
dpiSodaDoc *replacedDoc; dpiSodaDoc *handle, *replacedHandle;
cxoSodaDoc *doc;
uint32_t flags; uint32_t flags;
int status; int status;
if (cxoConnection_getSodaFlags(op->coll->db->connection, &flags) < 0) if (cxoConnection_getSodaFlags(op->coll->db->connection, &flags) < 0)
return NULL; return NULL;
if (cxoUtils_processSodaDocArg(op->coll->db, arg, &doc) < 0) if (cxoUtils_processSodaDocArg(op->coll->db, arg, &handle) < 0)
return NULL; return NULL;
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
status = dpiSodaColl_replaceOne(op->coll->handle, &op->options, status = dpiSodaColl_replaceOne(op->coll->handle, &op->options, handle,
doc->handle, flags, NULL, &replacedDoc); flags, NULL, &replacedHandle);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if (status < 0) { if (status < 0)
cxoError_raiseAndReturnNull(); cxoError_raiseAndReturnNull();
Py_DECREF(doc); dpiSodaDoc_release(handle);
if (status < 0)
return NULL; return NULL;
} if (replacedHandle)
Py_DECREF(doc); return (PyObject*) cxoSodaDoc_new(op->coll->db, replacedHandle);
if (replacedDoc)
return (PyObject*) cxoSodaDoc_new(op->coll->db, replacedDoc);
Py_RETURN_NONE; Py_RETURN_NONE;
} }

View File

@ -165,14 +165,16 @@ int cxoUtils_processJsonArg(PyObject *arg, cxoBuffer *buffer)
// a key or media type specified. // a key or media type specified.
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
int cxoUtils_processSodaDocArg(cxoSodaDatabase *db, PyObject *arg, int cxoUtils_processSodaDocArg(cxoSodaDatabase *db, PyObject *arg,
cxoSodaDoc **doc) dpiSodaDoc **handle)
{ {
dpiSodaDoc *handle;
cxoBuffer buffer; cxoBuffer buffer;
cxoSodaDoc *doc;
if (PyObject_TypeCheck(arg, &cxoPyTypeSodaDoc)) { if (PyObject_TypeCheck(arg, &cxoPyTypeSodaDoc)) {
Py_INCREF(arg); doc = (cxoSodaDoc*) arg;
*doc = (cxoSodaDoc*) arg; if (dpiSodaDoc_addRef(doc->handle) < 0)
return cxoError_raiseAndReturnInt();
*handle = doc->handle;
} else if (PyDict_Check(arg) || PyList_Check(arg)) { } else if (PyDict_Check(arg) || PyList_Check(arg)) {
arg = PyObject_CallFunctionObjArgs(cxoJsonDumpFunction, arg, NULL); arg = PyObject_CallFunctionObjArgs(cxoJsonDumpFunction, arg, NULL);
if (!arg) if (!arg)
@ -183,18 +185,14 @@ int cxoUtils_processSodaDocArg(cxoSodaDatabase *db, PyObject *arg,
} }
Py_DECREF(arg); Py_DECREF(arg);
if (dpiSodaDb_createDocument(db->handle, NULL, 0, buffer.ptr, if (dpiSodaDb_createDocument(db->handle, NULL, 0, buffer.ptr,
buffer.size, NULL, 0, DPI_SODA_FLAGS_DEFAULT, &handle) < 0) { buffer.size, NULL, 0, DPI_SODA_FLAGS_DEFAULT, handle) < 0) {
cxoError_raiseAndReturnNull();
cxoBuffer_clear(&buffer); cxoBuffer_clear(&buffer);
return -1; return cxoError_raiseAndReturnInt();
} }
cxoBuffer_clear(&buffer); cxoBuffer_clear(&buffer);
*doc = cxoSodaDoc_new(db, handle);
if (!*doc)
return -1;
} else { } else {
PyErr_SetString(PyExc_TypeError, PyErr_SetString(PyExc_TypeError,
"value must be a SODA document or dictionary"); "value must be a SODA document or a dictionary or list");
return -1; return -1;
} }