Continued work on Unicode support; added new test cases for full unicode

support within Python 2.x; move away from character semantics which Oracle is
deprecating anyway to byte semantics which should hopefully eliminate the
problem with a backend character set of UTF-8.
This commit is contained in:
Anthony Tuininga 2008-10-14 04:51:43 +00:00
parent 45faba4a48
commit 2e26d0beb8
13 changed files with 327 additions and 136 deletions

View File

@ -56,7 +56,7 @@ static PyObject *Connection_GetVersion(udt_Connection*, void*);
static PyObject *Connection_GetMaxBytesPerCharacter(udt_Connection*, void*); static PyObject *Connection_GetMaxBytesPerCharacter(udt_Connection*, void*);
static PyObject *Connection_ContextManagerEnter(udt_Connection*, PyObject*); static PyObject *Connection_ContextManagerEnter(udt_Connection*, PyObject*);
static PyObject *Connection_ContextManagerExit(udt_Connection*, PyObject*); static PyObject *Connection_ContextManagerExit(udt_Connection*, PyObject*);
#ifdef OCI_NLS_CHARSET_MAXBYTESZ #ifndef WITH_UNICODE
static PyObject *Connection_GetEncoding(udt_Connection*, void*); static PyObject *Connection_GetEncoding(udt_Connection*, void*);
static PyObject *Connection_GetNationalEncoding(udt_Connection*, void*); static PyObject *Connection_GetNationalEncoding(udt_Connection*, void*);
#endif #endif
@ -119,7 +119,7 @@ static PyMemberDef g_ConnectionMembers[] = {
// declaration of calculated members for Python type "Connection" // declaration of calculated members for Python type "Connection"
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
static PyGetSetDef g_ConnectionCalcMembers[] = { static PyGetSetDef g_ConnectionCalcMembers[] = {
#ifdef OCI_NLS_CHARSET_MAXBYTESZ #ifndef WITH_UNICODE
{ "encoding", (getter) Connection_GetEncoding, 0, 0, 0 }, { "encoding", (getter) Connection_GetEncoding, 0, 0, 0 },
{ "nencoding", (getter) Connection_GetNationalEncoding, 0, 0, 0 }, { "nencoding", (getter) Connection_GetNationalEncoding, 0, 0, 0 },
#endif #endif
@ -364,6 +364,7 @@ static int Connection_SetOCIAttr(
PyObject *value, // value to set PyObject *value, // value to set
ub4 *attribute) // OCI attribute type ub4 *attribute) // OCI attribute type
{ {
udt_StringBuffer buffer;
sword status; sword status;
// make sure connection is connected // make sure connection is connected
@ -371,13 +372,15 @@ static int Connection_SetOCIAttr(
return -1; return -1;
// set the value in the OCI // set the value in the OCI
if (!PyString_Check(value)) { if (!CXORA_STRING_CHECK(value)) {
PyErr_SetString(PyExc_TypeError, "value must be a string"); PyErr_SetString(PyExc_TypeError, "value must be a string");
return -1; return -1;
} }
if (StringBuffer_Fill(&buffer, value))
return -1;
status = OCIAttrSet(self->sessionHandle, OCI_HTYPE_SESSION, status = OCIAttrSet(self->sessionHandle, OCI_HTYPE_SESSION,
PyString_AS_STRING(value), PyString_GET_SIZE(value), buffer.ptr, buffer.size, *attribute,
*attribute, self->environment->errorHandle); self->environment->errorHandle);
if (Environment_CheckForError(self->environment, status, if (Environment_CheckForError(self->environment, status,
"Connection_SetOCIAttr()") < 0) "Connection_SetOCIAttr()") < 0)
return -1; return -1;
@ -786,7 +789,7 @@ static PyObject *Connection_Repr(
} }
#ifdef OCI_NLS_CHARSET_MAXBYTESZ #ifndef WITH_UNICODE
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Connection_GetCharacterSetName() // Connection_GetCharacterSetName()
// Retrieve the IANA character set name for the attribute. // Retrieve the IANA character set name for the attribute.
@ -911,8 +914,8 @@ static PyObject *Connection_GetVersion(
udt_Connection *self, // connection object udt_Connection *self, // connection object
void *arg) // optional argument (ignored) void *arg) // optional argument (ignored)
{ {
PyObject *procName, *listOfArguments;
udt_Variable *versionVar, *compatVar; udt_Variable *versionVar, *compatVar;
PyObject *results, *temp;
udt_Cursor *cursor; udt_Cursor *cursor;
// if version has already been determined, no need to determine again // if version has already been determined, no need to determine again
@ -943,29 +946,37 @@ static PyObject *Connection_GetVersion(
return NULL; return NULL;
} }
// create the parameters for the function call // create the list of arguments
temp = Py_BuildValue("(s,[OO])", listOfArguments = PyList_New(2);
"begin dbms_utility.db_version(:ver, :compat); end;", if (!listOfArguments) {
versionVar, compatVar); Py_DECREF(versionVar);
Py_DECREF(versionVar); Py_DECREF(compatVar);
Py_DECREF(compatVar); Py_DECREF(cursor);
if (!temp) { return NULL;
}
PyList_SET_ITEM(listOfArguments, 0, (PyObject*) versionVar);
PyList_SET_ITEM(listOfArguments, 1, (PyObject*) compatVar);
// create the string variable
procName = CXORA_ASCII_TO_STRING("dbms_utility.db_version");
if (!procName) {
Py_DECREF(listOfArguments);
Py_DECREF(cursor); Py_DECREF(cursor);
return NULL; return NULL;
} }
// execute the cursor // call stored procedure
results = Cursor_Execute(cursor, temp, NULL); if (Cursor_Call(cursor, NULL, procName, listOfArguments) < 0) {
if (!results) { Py_DECREF(procName);
Py_DECREF(temp); Py_DECREF(listOfArguments);
Py_DECREF(cursor); Py_DECREF(cursor);
return NULL; return NULL;
} }
Py_DECREF(results); Py_DECREF(procName);
// retrieve value // retrieve value
self->version = Variable_GetValue(versionVar, 0); self->version = Variable_GetValue(versionVar, 0);
Py_DECREF(temp); Py_DECREF(listOfArguments);
Py_DECREF(cursor); Py_DECREF(cursor);
Py_XINCREF(self->version); Py_XINCREF(self->version);
return self->version; return self->version;

View File

@ -204,29 +204,22 @@ static int Cursor_FreeHandle(
udt_Cursor *self, // cursor object udt_Cursor *self, // cursor object
int raiseException) // raise an exception, if necesary? int raiseException) // raise an exception, if necesary?
{ {
ub4 tagLength; udt_StringBuffer buffer;
sword status; sword status;
char *tag;
if (self->handle) { if (self->handle) {
if (self->isOwned) { if (self->isOwned) {
OCIHandleFree(self->handle, OCI_HTYPE_STMT); OCIHandleFree(self->handle, OCI_HTYPE_STMT);
} else { } else if (self->connection->handle != 0) {
if (self->statementTag) { if (!StringBuffer_Fill(&buffer, self->statementTag) < 0)
tag = PyString_AS_STRING(self->statementTag); return (raiseException) ? -1 : 0;
tagLength = PyString_GET_SIZE(self->statementTag); status = OCIStmtRelease(self->handle,
} else { self->environment->errorHandle, (text*) buffer.ptr,
tag = NULL; buffer.size, OCI_DEFAULT);
tagLength = 0; StringBuffer_CLEAR(&buffer);
} if (raiseException && Environment_CheckForError(
if (self->connection->handle != 0) { self->environment, status, "Cursor_FreeHandle()") < 0)
status = OCIStmtRelease(self->handle, return -1;
self->environment->errorHandle, (text*) tag, tagLength,
OCI_DEFAULT);
if (raiseException && Environment_CheckForError(
self->environment, status, "Cursor_FreeHandle()") < 0)
return -1;
}
} }
} }
return 0; return 0;
@ -415,8 +408,7 @@ static int Cursor_GetBindNames(
// process the bind information returned // process the bind information returned
for (i = 0; i < foundElements; i++) { for (i = 0; i < foundElements; i++) {
if (!duplicate[i]) { if (!duplicate[i]) {
temp = PyString_FromStringAndSize(bindNames[i], temp = CXORA_BUFFER_TO_STRING(bindNames[i], bindNameLengths[i]);
bindNameLengths[i]);
if (!temp) { if (!temp) {
Py_DECREF(*names); Py_DECREF(*names);
PyMem_Free(buffer); PyMem_Free(buffer);
@ -649,14 +641,18 @@ static PyObject *Cursor_ItemDescriptionHelper(
type = (PyObject*) varType->pythonType; type = (PyObject*) varType->pythonType;
if (type == (PyObject*) &g_StringVarType) if (type == (PyObject*) &g_StringVarType)
displaySize = internalSize; displaySize = internalSize;
#ifndef WITH_UNICODE
else if (type == (PyObject*) &g_UnicodeVarType) else if (type == (PyObject*) &g_UnicodeVarType)
displaySize = internalSize / 2; displaySize = internalSize / 2;
#endif
else if (type == (PyObject*) &g_BinaryVarType) else if (type == (PyObject*) &g_BinaryVarType)
displaySize = internalSize; displaySize = internalSize;
else if (type == (PyObject*) &g_FixedCharVarType) else if (type == (PyObject*) &g_FixedCharVarType)
displaySize = internalSize; displaySize = internalSize;
#ifndef WITH_UNICODE
else if (type == (PyObject*) &g_FixedUnicodeVarType) else if (type == (PyObject*) &g_FixedUnicodeVarType)
displaySize = internalSize / 2; displaySize = internalSize / 2;
#endif
else if (type == (PyObject*) &g_NumberVarType) { else if (type == (PyObject*) &g_NumberVarType) {
if (precision) { if (precision) {
displaySize = precision + 1; displaySize = precision + 1;
@ -676,7 +672,7 @@ static PyObject *Cursor_ItemDescriptionHelper(
return NULL; return NULL;
// set each of the items in the tuple // set each of the items in the tuple
PyTuple_SET_ITEM(tuple, 0, CXORA_TO_STRING_OBJ(name, nameLength)); PyTuple_SET_ITEM(tuple, 0, CXORA_BUFFER_TO_STRING(name, nameLength));
Py_INCREF(type); Py_INCREF(type);
PyTuple_SET_ITEM(tuple, 1, type); PyTuple_SET_ITEM(tuple, 1, type);
PyTuple_SET_ITEM(tuple, 2, PyInt_FromLong(displaySize)); PyTuple_SET_ITEM(tuple, 2, PyInt_FromLong(displaySize));
@ -1308,7 +1304,7 @@ static int Cursor_Call(
Py_DECREF(arguments); Py_DECREF(arguments);
// create the statement object // create the statement object
format = PyString_FromString(statement); format = PyBytes_FromString(statement);
PyMem_Free(statement); PyMem_Free(statement);
if (!format) { if (!format) {
Py_DECREF(bindVariables); Py_DECREF(bindVariables);
@ -1436,13 +1432,8 @@ static PyObject *Cursor_Execute(
executeArgs = NULL; executeArgs = NULL;
if (!PyArg_ParseTuple(args, "O|O", &statement, &executeArgs)) if (!PyArg_ParseTuple(args, "O|O", &statement, &executeArgs))
return NULL; return NULL;
#ifdef WITH_UNICODE if (statement != Py_None && !CXORA_STRING_CHECK(statement)) {
if (statement != Py_None && !PyUnicode_Check(statement)) {
PyErr_SetString(PyExc_TypeError, "expecting None or unicode string");
#else
if (statement != Py_None && !PyString_Check(statement)) {
PyErr_SetString(PyExc_TypeError, "expecting None or a string"); PyErr_SetString(PyExc_TypeError, "expecting None or a string");
#endif
return NULL; return NULL;
} }
if (executeArgs && keywordArgs) { if (executeArgs && keywordArgs) {
@ -1520,8 +1511,8 @@ static PyObject *Cursor_ExecuteMany(
if (!PyArg_ParseTuple(args, "OO!", &statement, &PyList_Type, if (!PyArg_ParseTuple(args, "OO!", &statement, &PyList_Type,
&listOfArguments)) &listOfArguments))
return NULL; return NULL;
if (statement != Py_None && !PyString_Check(statement)) { if (statement != Py_None && !CXORA_STRING_CHECK(statement)) {
PyErr_SetString(PyExc_TypeError, "expecting None or a string"); PyErr_SetString(PyExc_TypeError, "expecting None or string");
return NULL; return NULL;
} }

View File

@ -75,7 +75,11 @@ static udt_Environment *Environment_New(
return NULL; return NULL;
environment->handle = NULL; environment->handle = NULL;
environment->errorHandle = NULL; environment->errorHandle = NULL;
#ifdef WITH_UNICODE
environment->maxBytesPerCharacter = 2;
#else
environment->maxBytesPerCharacter = 1; environment->maxBytesPerCharacter = 1;
#endif
environment->fixedWidth = 1; environment->fixedWidth = 1;
environment->maxStringBytes = MAX_STRING_CHARS; environment->maxStringBytes = MAX_STRING_CHARS;
@ -114,7 +118,7 @@ static udt_Environment *Environment_New(
} }
// acquire max bytes per character // acquire max bytes per character
#ifdef OCI_NLS_CHARSET_MAXBYTESZ #ifndef WITH_UNICODE
status = OCINlsNumericInfoGet(environment->handle, status = OCINlsNumericInfoGet(environment->handle,
environment->errorHandle, &environment->maxBytesPerCharacter, environment->errorHandle, &environment->maxBytesPerCharacter,
OCI_NLS_CHARSET_MAXBYTESZ); OCI_NLS_CHARSET_MAXBYTESZ);

View File

@ -109,7 +109,7 @@ static udt_Error *Error_New(
if (errorText[len] == 0 && errorText[len + 1] == 0) if (errorText[len] == 0 && errorText[len + 1] == 0)
break; break;
} }
error->message = CXORA_TO_STRING_OBJ(errorText, len); error->message = CXORA_BUFFER_TO_STRING(errorText, len);
#else #else
error->message = PyString_FromString(errorText); error->message = PyString_FromString(errorText);
#endif #endif
@ -146,10 +146,6 @@ static PyObject *Error_Str(
Py_INCREF(self->message); Py_INCREF(self->message);
return self->message; return self->message;
} }
#ifdef WITH_UNICODE return CXORA_ASCII_TO_STRING("");
return PyUnicode_DecodeASCII("", 0, NULL);
#else
return PyString_FromString("");
#endif
} }

View File

@ -453,10 +453,10 @@ static PyObject *NumberVar_GetValue(
udt_NumberVar *var, // variable to determine value for udt_NumberVar *var, // variable to determine value for
unsigned pos) // array position unsigned pos) // array position
{ {
PyObject *result, *stringObj;
char stringValue[200]; char stringValue[200];
long integerValue; long integerValue;
ub4 stringLength; ub4 stringLength;
PyObject *result;
sword status; sword status;
if (var->type == &vt_Integer || var->type == &vt_Boolean) { if (var->type == &vt_Integer || var->type == &vt_Boolean) {
@ -473,18 +473,19 @@ static PyObject *NumberVar_GetValue(
if (var->type == &vt_NumberAsString || var->type == &vt_LongInteger) { if (var->type == &vt_NumberAsString || var->type == &vt_LongInteger) {
stringLength = sizeof(stringValue); stringLength = sizeof(stringValue);
status = OCINumberToText(var->environment->errorHandle, status = OCINumberToText(var->environment->errorHandle,
&var->data[pos], (unsigned char*) "TM9", 3, NULL, 0, &var->data[pos], (text*) g_NumberToStringFormatBuffer.ptr,
&stringLength, (unsigned char*) stringValue); g_NumberToStringFormatBuffer.size, NULL, 0, &stringLength,
(unsigned char*) stringValue);
if (Environment_CheckForError(var->environment, status, if (Environment_CheckForError(var->environment, status,
"NumberVar_GetValue(): as string") < 0) "NumberVar_GetValue(): as string") < 0)
return NULL; return NULL;
stringObj = CXORA_BUFFER_TO_STRING(stringValue, stringLength);
if (!stringObj)
return NULL;
if (var->type == &vt_NumberAsString) if (var->type == &vt_NumberAsString)
return PyString_FromStringAndSize(stringValue, stringLength); return stringObj;
result = PyInt_FromString(stringValue, NULL, 10); result = PyNumber_Int(stringObj);
if (result || !PyErr_ExceptionMatches(PyExc_ValueError)) Py_DECREF(stringObj);
return result;
PyErr_Clear();
result = PyLong_FromString(stringValue, NULL, 10);
if (result || !PyErr_ExceptionMatches(PyExc_ValueError)) if (result || !PyErr_ExceptionMatches(PyExc_ValueError))
return result; return result;
PyErr_Clear(); PyErr_Clear();

View File

@ -169,12 +169,13 @@ static int SessionPool_Init(
PyObject *args, // arguments PyObject *args, // arguments
PyObject *keywordArgs) // keyword arguments PyObject *keywordArgs) // keyword arguments
{ {
unsigned usernameLength, passwordLength, dsnLength, poolNameLength;
unsigned minSessions, maxSessions, sessionIncrement; unsigned minSessions, maxSessions, sessionIncrement;
PyObject *threadedObj, *eventsObj, *homogeneousObj; PyObject *threadedObj, *eventsObj, *homogeneousObj;
const char *username, *password, *dsn, *poolName; udt_StringBuffer username, password, dsn;
int threaded, events, homogeneous; int threaded, events, homogeneous;
PyTypeObject *connectionType; PyTypeObject *connectionType;
unsigned poolNameLength;
const char *poolName;
sword status; sword status;
ub4 poolMode; ub4 poolMode;
ub1 getMode; ub1 getMode;
@ -190,11 +191,11 @@ static int SessionPool_Init(
threadedObj = eventsObj = homogeneousObj = NULL; threadedObj = eventsObj = homogeneousObj = NULL;
connectionType = &g_ConnectionType; connectionType = &g_ConnectionType;
getMode = OCI_SPOOL_ATTRVAL_NOWAIT; getMode = OCI_SPOOL_ATTRVAL_NOWAIT;
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "s#s#s#iii|OObOO", if (!PyArg_ParseTupleAndKeywords(args, keywordArgs, "O!O!O!iii|OObOO",
keywordList, &username, &usernameLength, &password, keywordList, CXORA_STRING_TYPE, &self->username,
&passwordLength, &dsn, &dsnLength, &minSessions, &maxSessions, CXORA_STRING_TYPE, &self->password, CXORA_STRING_TYPE, &self->dsn,
&sessionIncrement, &connectionType, &threadedObj, &getMode, &minSessions, &maxSessions, &sessionIncrement, &connectionType,
&eventsObj, &homogeneousObj)) &threadedObj, &getMode, &eventsObj, &homogeneousObj))
return -1; return -1;
if (!PyType_Check(connectionType)) { if (!PyType_Check(connectionType)) {
PyErr_SetString(g_ProgrammingErrorException, PyErr_SetString(g_ProgrammingErrorException,
@ -225,6 +226,9 @@ static int SessionPool_Init(
// initialize the object's members // initialize the object's members
Py_INCREF(connectionType); Py_INCREF(connectionType);
self->connectionType = connectionType; self->connectionType = connectionType;
Py_INCREF(self->dsn);
Py_INCREF(self->username);
Py_INCREF(self->password);
self->minSessions = minSessions; self->minSessions = minSessions;
self->maxSessions = maxSessions; self->maxSessions = maxSessions;
self->sessionIncrement = sessionIncrement; self->sessionIncrement = sessionIncrement;
@ -235,21 +239,6 @@ static int SessionPool_Init(
if (!self->environment) if (!self->environment)
return -1; return -1;
// create the string for the username
self->username = PyString_FromStringAndSize(username, usernameLength);
if (!self->username)
return -1;
// create the string for the password
self->password = PyString_FromStringAndSize(password, passwordLength);
if (!self->password)
return -1;
// create the string for the TNS entry
self->dsn = PyString_FromStringAndSize(dsn, dsnLength);
if (!self->dsn)
return -1;
// create the session pool handle // create the session pool handle
status = OCIHandleAlloc(self->environment->handle, (dvoid**) &self->handle, status = OCIHandleAlloc(self->environment->handle, (dvoid**) &self->handle,
OCI_HTYPE_SPOOL, 0, 0); OCI_HTYPE_SPOOL, 0, 0);
@ -263,19 +252,34 @@ static int SessionPool_Init(
poolMode |= OCI_SPC_HOMOGENEOUS; poolMode |= OCI_SPC_HOMOGENEOUS;
// create the session pool // create the session pool
if (StringBuffer_Fill(&username, self->username) < 0)
return -1;
if (StringBuffer_Fill(&password, self->password) < 0) {
StringBuffer_CLEAR(&username);
return -1;
}
if (StringBuffer_Fill(&dsn, self->dsn) < 0) {
StringBuffer_CLEAR(&username);
StringBuffer_CLEAR(&password);
return -1;
}
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
status = OCISessionPoolCreate(self->environment->handle, status = OCISessionPoolCreate(self->environment->handle,
self->environment->errorHandle, self->handle, self->environment->errorHandle, self->handle,
(OraText**) &poolName, &poolNameLength, (OraText*) dsn, dsnLength, (OraText**) &poolName, &poolNameLength, (OraText*) dsn.ptr,
minSessions, maxSessions, sessionIncrement, (OraText*) username, dsn.size, minSessions, maxSessions, sessionIncrement,
usernameLength, (OraText*) password, passwordLength, poolMode); (OraText*) username.ptr, username.size, (OraText*) password.ptr,
password.size, poolMode);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
StringBuffer_CLEAR(&username);
StringBuffer_CLEAR(&password);
StringBuffer_CLEAR(&dsn);
if (Environment_CheckForError(self->environment, status, if (Environment_CheckForError(self->environment, status,
"SessionPool_New(): create pool") < 0) "SessionPool_New(): create pool") < 0)
return -1; return -1;
// create the string for the pool name // create the string for the pool name
self->name = PyString_FromStringAndSize(poolName, poolNameLength); self->name = CXORA_BUFFER_TO_STRING(poolName, poolNameLength);
if (!self->name) if (!self->name)
return -1; return -1;

View File

@ -3,6 +3,7 @@
// Defines constants and routines specific to handling strings. // Defines constants and routines specific to handling strings.
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// define structure for abstracting string buffers
typedef struct { typedef struct {
char *ptr; char *ptr;
Py_ssize_t size; Py_ssize_t size;
@ -12,6 +13,19 @@ typedef struct {
} udt_StringBuffer; } udt_StringBuffer;
// use the bytes methods in cx_Oracle and define them as the equivalent string
// type methods as is done in Python 2.6
#ifndef PyBytes_Check
#define PyBytes_Type PyString_Type
#define PyBytes_AS_STRING PyString_AS_STRING
#define PyBytes_GET_SIZE PyString_GET_SIZE
#define PyBytes_Check PyString_Check
#define PyBytes_Format PyString_Format
#define PyBytes_FromString PyString_FromString
#define PyBytes_FromStringAndSize PyString_FromStringAndSize
#endif
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// StringBuffer_Fill() // StringBuffer_Fill()
// Fill the string buffer with the UTF-16 data that Oracle expects. // Fill the string buffer with the UTF-16 data that Oracle expects.
@ -36,15 +50,15 @@ static int StringBuffer_Fill(
PyUnicode_GET_SIZE(obj), NULL, byteOrder); PyUnicode_GET_SIZE(obj), NULL, byteOrder);
if (!buf->encodedString) if (!buf->encodedString)
return -1; return -1;
buf->ptr = PyString_AS_STRING(buf->encodedString); buf->ptr = PyBytes_AS_STRING(buf->encodedString);
buf->size = PyString_GET_SIZE(buf->encodedString); buf->size = PyBytes_GET_SIZE(buf->encodedString);
#else #else
buf->ptr = (char*) PyUnicode_AS_UNICODE(obj); buf->ptr = (char*) PyUnicode_AS_UNICODE(obj);
buf->size = PyUnicode_GET_DATA_SIZE(obj); buf->size = PyUnicode_GET_DATA_SIZE(obj);
#endif #endif
#else #else
buf->ptr = PyString_AS_STRING(obj); buf->ptr = PyBytes_AS_STRING(obj);
buf->size = PyString_GET_SIZE(obj); buf->size = PyBytes_GET_SIZE(obj);
#endif #endif
return 0; return 0;
} }
@ -55,23 +69,29 @@ static int StringBuffer_Fill(
#define CXORA_ERROR_TEXT_LENGTH 2048 #define CXORA_ERROR_TEXT_LENGTH 2048
#define CXORA_STRING_TYPE &PyUnicode_Type #define CXORA_STRING_TYPE &PyUnicode_Type
#define CXORA_STRING_FROM_FORMAT PyUnicode_Format #define CXORA_STRING_FROM_FORMAT PyUnicode_Format
#define CXORA_STRING_CHECK PyUnicode_Check
#define CXORA_ASCII_TO_STRING(str) \
PyUnicode_DecodeASCII(str, strlen(str), NULL)
#ifdef Py_UNICODE_WIDE #ifdef Py_UNICODE_WIDE
#define StringBuffer_CLEAR(buffer) \ #define StringBuffer_CLEAR(buffer) \
Py_XDECREF((buffer)->encodedString) Py_XDECREF((buffer)->encodedString)
#define CXORA_TO_STRING_OBJ(buffer, numBytes) \ #define CXORA_BUFFER_TO_STRING(buffer, numBytes) \
PyUnicode_DecodeUTF16(buffer, numBytes, NULL, NULL) PyUnicode_DecodeUTF16(buffer, numBytes, NULL, NULL)
#else #else
#define StringBuffer_CLEAR(buffer) #define StringBuffer_CLEAR(buffer)
#define CXORA_TO_STRING_OBJ(buffer, numBytes) \ #define CXORA_BUFFER_TO_STRING(buffer, numBytes) \
PyUnicode_FromUnicode((Py_UNICODE*) (buffer), (numBytes) / 2) PyUnicode_FromUnicode((Py_UNICODE*) (buffer), (numBytes) / 2)
#endif #endif
#else #else
#define CXORA_CHARSETID 0 #define CXORA_CHARSETID 0
#define CXORA_ERROR_TEXT_LENGTH 1024 #define CXORA_ERROR_TEXT_LENGTH 1024
#define CXORA_STRING_TYPE &PyString_Type #define CXORA_STRING_TYPE &PyBytes_Type
#define CXORA_STRING_FROM_FORMAT PyString_Format #define CXORA_STRING_FROM_FORMAT PyBytes_Format
#define CXORA_STRING_CHECK PyBytes_Check
#define StringBuffer_CLEAR(buffer) #define StringBuffer_CLEAR(buffer)
#define CXORA_TO_STRING_OBJ(buffer, numBytes) \ #define CXORA_ASCII_TO_STRING(str) \
PyString_FromStringAndSize(buffer, numBytes) PyBytes_FromString(str)
#define CXORA_BUFFER_TO_STRING(buffer, numBytes) \
PyBytes_FromStringAndSize(buffer, numBytes)
#endif #endif

View File

@ -16,7 +16,9 @@ typedef struct {
// Declaration of string variable functions. // Declaration of string variable functions.
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
static int StringVar_Initialize(udt_StringVar*, udt_Cursor*); static int StringVar_Initialize(udt_StringVar*, udt_Cursor*);
#ifndef WITH_UNICODE
static int StringVar_PostDefine(udt_StringVar*); static int StringVar_PostDefine(udt_StringVar*);
#endif
static int StringVar_SetValue(udt_StringVar*, unsigned, PyObject*); static int StringVar_SetValue(udt_StringVar*, unsigned, PyObject*);
static PyObject *StringVar_GetValue(udt_StringVar*, unsigned); static PyObject *StringVar_GetValue(udt_StringVar*, unsigned);
@ -48,6 +50,7 @@ static PyTypeObject g_StringVarType = {
}; };
#ifndef WITH_UNICODE
static PyTypeObject g_UnicodeVarType = { static PyTypeObject g_UnicodeVarType = {
PyVarObject_HEAD_INIT(NULL, 0) PyVarObject_HEAD_INIT(NULL, 0)
"cx_Oracle.UNICODE", // tp_name "cx_Oracle.UNICODE", // tp_name
@ -71,6 +74,7 @@ static PyTypeObject g_UnicodeVarType = {
Py_TPFLAGS_DEFAULT, // tp_flags Py_TPFLAGS_DEFAULT, // tp_flags
0 // tp_doc 0 // tp_doc
}; };
#endif
static PyTypeObject g_FixedCharVarType = { static PyTypeObject g_FixedCharVarType = {
@ -98,6 +102,7 @@ static PyTypeObject g_FixedCharVarType = {
}; };
#ifndef WITH_UNICODE
static PyTypeObject g_FixedUnicodeVarType = { static PyTypeObject g_FixedUnicodeVarType = {
PyVarObject_HEAD_INIT(NULL, 0) PyVarObject_HEAD_INIT(NULL, 0)
"cx_Oracle.FIXED_UNICODE", // tp_name "cx_Oracle.FIXED_UNICODE", // tp_name
@ -121,6 +126,7 @@ static PyTypeObject g_FixedUnicodeVarType = {
Py_TPFLAGS_DEFAULT, // tp_flags Py_TPFLAGS_DEFAULT, // tp_flags
0 // tp_doc 0 // tp_doc
}; };
#endif
static PyTypeObject g_RowidVarType = { static PyTypeObject g_RowidVarType = {
@ -194,6 +200,7 @@ static udt_VariableType vt_String = {
}; };
#ifndef WITH_UNICODE
static udt_VariableType vt_NationalCharString = { static udt_VariableType vt_NationalCharString = {
(InitializeProc) StringVar_Initialize, (InitializeProc) StringVar_Initialize,
(FinalizeProc) NULL, (FinalizeProc) NULL,
@ -210,6 +217,7 @@ static udt_VariableType vt_NationalCharString = {
1, // can be copied 1, // can be copied
1 // can be in array 1 // can be in array
}; };
#endif
static udt_VariableType vt_FixedChar = { static udt_VariableType vt_FixedChar = {
@ -230,6 +238,7 @@ static udt_VariableType vt_FixedChar = {
}; };
#ifndef WITH_UNICODE
static udt_VariableType vt_FixedNationalChar = { static udt_VariableType vt_FixedNationalChar = {
(InitializeProc) StringVar_Initialize, (InitializeProc) StringVar_Initialize,
(FinalizeProc) NULL, (FinalizeProc) NULL,
@ -246,6 +255,7 @@ static udt_VariableType vt_FixedNationalChar = {
1, // can be copied 1, // can be copied
1 // can be in array 1 // can be in array
}; };
#endif
static udt_VariableType vt_Rowid = { static udt_VariableType vt_Rowid = {
@ -302,6 +312,7 @@ static int StringVar_Initialize(
} }
#ifndef WITH_UNICODE
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// StringVar_PostDefine() // StringVar_PostDefine()
// Set the character set information when values are fetched from this // Set the character set information when values are fetched from this
@ -329,6 +340,7 @@ static int StringVar_PostDefine(
return 0; return 0;
} }
#endif
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -343,7 +355,6 @@ static int StringVar_SetValue(
PyObject *encodedString; PyObject *encodedString;
Py_ssize_t bufferSize; Py_ssize_t bufferSize;
const void *buffer; const void *buffer;
ub2 actualLength;
// get the buffer data and size for binding // get the buffer data and size for binding
encodedString = NULL; encodedString = NULL;
@ -359,7 +370,6 @@ static int StringVar_SetValue(
"expecting string or buffer data"); "expecting string or buffer data");
return -1; return -1;
} }
actualLength = (ub2) bufferSize;
} else { } else {
if (!PyUnicode_Check(value)) { if (!PyUnicode_Check(value)) {
PyErr_SetString(PyExc_TypeError, "expecting unicode data"); PyErr_SetString(PyExc_TypeError, "expecting unicode data");
@ -372,13 +382,12 @@ static int StringVar_SetValue(
PyUnicode_GET_SIZE(value), NULL, byteOrder); PyUnicode_GET_SIZE(value), NULL, byteOrder);
if (!encodedString) if (!encodedString)
return -1; return -1;
buffer = PyString_AS_STRING(encodedString); buffer = PyBytes_AS_STRING(encodedString);
bufferSize = PyString_GET_SIZE(encodedString); bufferSize = PyBytes_GET_SIZE(encodedString);
#else #else
buffer = PyUnicode_AS_UNICODE(value); buffer = PyUnicode_AS_UNICODE(value);
bufferSize = sizeof(Py_UNICODE) * PyUnicode_GET_SIZE(value); bufferSize = PyUnicode_GET_DATA_SIZE(value);
#endif #endif
actualLength = bufferSize / 2;
} }
// ensure that the buffer is not too large // ensure that the buffer is not too large
@ -395,7 +404,7 @@ static int StringVar_SetValue(
} }
// keep a copy of the string // keep a copy of the string
var->actualLength[pos] = actualLength; var->actualLength[pos] = (ub2) bufferSize;
if (bufferSize) if (bufferSize)
memcpy(var->data + var->maxLength * pos, buffer, bufferSize); memcpy(var->data + var->maxLength * pos, buffer, bufferSize);
Py_XDECREF(encodedString); Py_XDECREF(encodedString);
@ -415,12 +424,18 @@ static PyObject *StringVar_GetValue(
char *data; char *data;
data = var->data + pos * var->maxLength; data = var->data + pos * var->maxLength;
#ifdef WTIH_UNICODE #ifdef WITH_UNICODE
if (var->type->charsetForm == SQLCS_IMPLICIT)
#else
if (var->type == &vt_Binary) if (var->type == &vt_Binary)
return PyBytes_FromStringAndSize(data, var->actualLength[pos]);
return CXORA_BUFFER_TO_STRING(data, var->actualLength[pos]);
#else
if (var->type->charsetForm == SQLCS_IMPLICIT)
return PyBytes_FromStringAndSize(data, var->actualLength[pos]);
#ifdef Py_UNICODE_WIDE
return PyUnicode_DecodeUTF16(data, var->actualLength[pos], NULL, NULL);
#else
return PyUnicode_FromUnicode((Py_UNICODE*) data, var->actualLength[pos]);
#endif
#endif #endif
return PyString_FromStringAndSize(data, var->actualLength[pos]);
return CXORA_TO_STRING_OBJ(data, var->actualLength[pos]);
} }

View File

@ -332,8 +332,10 @@ static int Variable_Check(
Py_TYPE(object) == &g_NumberVarType || Py_TYPE(object) == &g_NumberVarType ||
Py_TYPE(object) == &g_StringVarType || Py_TYPE(object) == &g_StringVarType ||
Py_TYPE(object) == &g_FixedCharVarType || Py_TYPE(object) == &g_FixedCharVarType ||
#ifndef WITH_UNICODE
Py_TYPE(object) == &g_UnicodeVarType || Py_TYPE(object) == &g_UnicodeVarType ||
Py_TYPE(object) == &g_FixedUnicodeVarType || Py_TYPE(object) == &g_FixedUnicodeVarType ||
#endif
Py_TYPE(object) == &g_RowidVarType || Py_TYPE(object) == &g_RowidVarType ||
Py_TYPE(object) == &g_BinaryVarType || Py_TYPE(object) == &g_BinaryVarType ||
Py_TYPE(object) == &g_TimestampVarType Py_TYPE(object) == &g_TimestampVarType
@ -355,16 +357,18 @@ static udt_VariableType *Variable_TypeByPythonType(
{ {
if (type == (PyObject*) &g_StringVarType) if (type == (PyObject*) &g_StringVarType)
return &vt_String; return &vt_String;
if (type == (PyObject*) &PyString_Type) if (type == (PyObject*) CXORA_STRING_TYPE)
return &vt_String; return &vt_String;
if (type == (PyObject*) &g_FixedCharVarType) if (type == (PyObject*) &g_FixedCharVarType)
return &vt_FixedChar; return &vt_FixedChar;
#ifndef WITH_UNICODE
if (type == (PyObject*) &g_UnicodeVarType) if (type == (PyObject*) &g_UnicodeVarType)
return &vt_NationalCharString; return &vt_NationalCharString;
if (type == (PyObject*) &PyUnicode_Type) if (type == (PyObject*) &PyUnicode_Type)
return &vt_NationalCharString; return &vt_NationalCharString;
if (type == (PyObject*) &g_FixedUnicodeVarType) if (type == (PyObject*) &g_FixedUnicodeVarType)
return &vt_FixedNationalChar; return &vt_FixedNationalChar;
#endif
if (type == (PyObject*) &g_RowidVarType) if (type == (PyObject*) &g_RowidVarType)
return &vt_Rowid; return &vt_Rowid;
if (type == (PyObject*) &g_BinaryVarType) if (type == (PyObject*) &g_BinaryVarType)
@ -437,10 +441,12 @@ static udt_VariableType *Variable_TypeByValue(
// handle scalars // handle scalars
if (value == Py_None) if (value == Py_None)
return &vt_String; return &vt_String;
if (PyString_Check(value)) if (CXORA_STRING_CHECK(value))
return &vt_String; return &vt_String;
#ifndef WITH_UNICODE
if (PyUnicode_Check(value)) if (PyUnicode_Check(value))
return &vt_NationalCharString; return &vt_NationalCharString;
#endif
if (PyInt_Check(value)) if (PyInt_Check(value))
return &vt_Integer; return &vt_Integer;
if (PyLong_Check(value)) if (PyLong_Check(value))
@ -503,12 +509,16 @@ static udt_VariableType *Variable_TypeByOracleDataType (
case SQLT_LNG: case SQLT_LNG:
return &vt_LongString; return &vt_LongString;
case SQLT_AFC: case SQLT_AFC:
#ifndef WITH_UNICODE
if (charsetForm == SQLCS_NCHAR) if (charsetForm == SQLCS_NCHAR)
return &vt_FixedNationalChar; return &vt_FixedNationalChar;
#endif
return &vt_FixedChar; return &vt_FixedChar;
case SQLT_CHR: case SQLT_CHR:
#ifndef WITH_UNICODE
if (charsetForm == SQLCS_NCHAR) if (charsetForm == SQLCS_NCHAR)
return &vt_NationalCharString; return &vt_NationalCharString;
#endif
return &vt_String; return &vt_String;
case SQLT_RDD: case SQLT_RDD:
return &vt_Rowid; return &vt_Rowid;
@ -1031,8 +1041,8 @@ static int Variable_InternalBind(
return -1; return -1;
// set the charset form and id if applicable // set the charset form and id if applicable
#ifndef WITH_UNICODE
if (var->type->charsetForm != SQLCS_IMPLICIT) { if (var->type->charsetForm != SQLCS_IMPLICIT) {
ub4 lengthInChars = var->maxLength / 2;
ub2 charsetId = OCI_UTF16ID; ub2 charsetId = OCI_UTF16ID;
status = OCIAttrSet(var->bindHandle, OCI_HTYPE_BIND, status = OCIAttrSet(var->bindHandle, OCI_HTYPE_BIND,
(dvoid*) &var->type->charsetForm, 0, OCI_ATTR_CHARSET_FORM, (dvoid*) &var->type->charsetForm, 0, OCI_ATTR_CHARSET_FORM,
@ -1047,12 +1057,13 @@ static int Variable_InternalBind(
"Variable_InternalBind(): setting charset Id") < 0) "Variable_InternalBind(): setting charset Id") < 0)
return -1; return -1;
status = OCIAttrSet(var->bindHandle, OCI_HTYPE_BIND, status = OCIAttrSet(var->bindHandle, OCI_HTYPE_BIND,
(dvoid*) &lengthInChars, 0, OCI_ATTR_CHAR_COUNT, (dvoid*) &var->maxLength, 0, OCI_ATTR_MAXDATA_SIZE,
var->environment->errorHandle); var->environment->errorHandle);
if (Environment_CheckForError(var->environment, status, if (Environment_CheckForError(var->environment, status,
"Variable_InternalBind(): set char count") < 0) "Variable_InternalBind(): set max data size") < 0)
return -1; return -1;
} }
#endif
// set the max data size for strings // set the max data size for strings
if ((var->type == &vt_String || var->type == &vt_FixedChar) if ((var->type == &vt_String || var->type == &vt_FixedChar)

View File

@ -79,6 +79,7 @@ typedef int Py_ssize_t;
#define BUILD_VERSION_STRING xstr(BUILD_VERSION) #define BUILD_VERSION_STRING xstr(BUILD_VERSION)
#define DRIVER_NAME "cx_Oracle-"BUILD_VERSION_STRING #define DRIVER_NAME "cx_Oracle-"BUILD_VERSION_STRING
#include "StringUtils.c"
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Globals // Globals
@ -98,6 +99,8 @@ static PyObject *g_ProgrammingErrorException = NULL;
static PyObject *g_NotSupportedErrorException = NULL; static PyObject *g_NotSupportedErrorException = NULL;
static PyTypeObject *g_DateTimeType = NULL; static PyTypeObject *g_DateTimeType = NULL;
static PyTypeObject *g_DecimalType = NULL; static PyTypeObject *g_DecimalType = NULL;
static PyObject *g_NumberToStringFormatObj = NULL;
static udt_StringBuffer g_NumberToStringFormatBuffer;
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -141,7 +144,6 @@ static int GetModuleAndName(
} }
#include "StringUtils.c"
#include "Environment.c" #include "Environment.c"
#include "SessionPool.c" #include "SessionPool.c"
@ -346,6 +348,14 @@ void initcx_Oracle(void)
"Decimal"); "Decimal");
PyErr_Clear(); PyErr_Clear();
// set up the string and buffer for converting numbers to strings
g_NumberToStringFormatObj = CXORA_ASCII_TO_STRING("TM9");
if (!g_NumberToStringFormatObj)
return;
if (StringBuffer_Fill(&g_NumberToStringFormatBuffer,
g_NumberToStringFormatObj) < 0)
return;
// prepare the types for use by the module // prepare the types for use by the module
MAKE_TYPE_READY(&g_ConnectionType); MAKE_TYPE_READY(&g_ConnectionType);
MAKE_TYPE_READY(&g_CursorType); MAKE_TYPE_READY(&g_CursorType);
@ -374,8 +384,10 @@ void initcx_Oracle(void)
MAKE_VARIABLE_TYPE_READY(&g_BFILEVarType); MAKE_VARIABLE_TYPE_READY(&g_BFILEVarType);
MAKE_VARIABLE_TYPE_READY(&g_CursorVarType); MAKE_VARIABLE_TYPE_READY(&g_CursorVarType);
MAKE_VARIABLE_TYPE_READY(&g_ObjectVarType); MAKE_VARIABLE_TYPE_READY(&g_ObjectVarType);
#ifndef WITH_UNICODE
MAKE_VARIABLE_TYPE_READY(&g_UnicodeVarType); MAKE_VARIABLE_TYPE_READY(&g_UnicodeVarType);
MAKE_VARIABLE_TYPE_READY(&g_FixedUnicodeVarType); MAKE_VARIABLE_TYPE_READY(&g_FixedUnicodeVarType);
#endif
#ifdef SQLT_BFLOAT #ifdef SQLT_BFLOAT
MAKE_VARIABLE_TYPE_READY(&g_NativeFloatVarType); MAKE_VARIABLE_TYPE_READY(&g_NativeFloatVarType);
#endif #endif
@ -442,7 +454,10 @@ void initcx_Oracle(void)
ADD_TYPE_OBJECT("OBJECT", &g_ObjectVarType) ADD_TYPE_OBJECT("OBJECT", &g_ObjectVarType)
ADD_TYPE_OBJECT("DATETIME", &g_DateTimeVarType) ADD_TYPE_OBJECT("DATETIME", &g_DateTimeVarType)
ADD_TYPE_OBJECT("FIXED_CHAR", &g_FixedCharVarType) ADD_TYPE_OBJECT("FIXED_CHAR", &g_FixedCharVarType)
#ifndef WITH_UNICODE
ADD_TYPE_OBJECT("FIXED_UNICODE", &g_FixedUnicodeVarType) ADD_TYPE_OBJECT("FIXED_UNICODE", &g_FixedUnicodeVarType)
ADD_TYPE_OBJECT("UNICODE", &g_UnicodeVarType)
#endif
ADD_TYPE_OBJECT("LOB", &g_ExternalLobVarType) ADD_TYPE_OBJECT("LOB", &g_ExternalLobVarType)
ADD_TYPE_OBJECT("LONG_BINARY", &g_LongBinaryVarType) ADD_TYPE_OBJECT("LONG_BINARY", &g_LongBinaryVarType)
ADD_TYPE_OBJECT("LONG_STRING", &g_LongStringVarType) ADD_TYPE_OBJECT("LONG_STRING", &g_LongStringVarType)
@ -451,7 +466,6 @@ void initcx_Oracle(void)
ADD_TYPE_OBJECT("ROWID", &g_RowidVarType) ADD_TYPE_OBJECT("ROWID", &g_RowidVarType)
ADD_TYPE_OBJECT("STRING", &g_StringVarType) ADD_TYPE_OBJECT("STRING", &g_StringVarType)
ADD_TYPE_OBJECT("TIMESTAMP", &g_TimestampVarType) ADD_TYPE_OBJECT("TIMESTAMP", &g_TimestampVarType)
ADD_TYPE_OBJECT("UNICODE", &g_UnicodeVarType)
#ifdef SQLT_BFLOAT #ifdef SQLT_BFLOAT
ADD_TYPE_OBJECT("NATIVE_FLOAT", &g_NativeFloatVarType) ADD_TYPE_OBJECT("NATIVE_FLOAT", &g_NativeFloatVarType)
#endif #endif

View File

@ -2,13 +2,16 @@
import cx_Oracle import cx_Oracle
import os import os
import sys
import unittest import unittest
def GetValue(name, label): def GetValue(name, label):
value = os.environ.get("CX_ORACLE_" + name) value = os.environ.get("CX_ORACLE_" + name)
if value is None: if value is None:
value = raw_input(label + ": ") value = raw_input(label + ": ")
return value if hasattr(cx_Oracle, "UNICODE") or sys.version_info[0] >= 3:
return value
return unicode(value)
USERNAME = GetValue("USERNAME", "user name") USERNAME = GetValue("USERNAME", "user name")
PASSWORD = GetValue("PASSWORD", "password") PASSWORD = GetValue("PASSWORD", "password")

View File

@ -10,20 +10,25 @@ print "Running tests for cx_Oracle version", cx_Oracle.version
import TestEnv import TestEnv
moduleNames = [ if hasattr(cx_Oracle, "UNICODE") or sys.version_info[0] >= 3:
"Connection", moduleNames = [
"Cursor", "Connection",
"CursorVar", "Cursor",
"DateTimeVar", "CursorVar",
"LobVar", "DateTimeVar",
"LongVar", "LobVar",
"NumberVar", "LongVar",
"ObjectVar", "NumberVar",
"SessionPool", "ObjectVar",
"StringVar", "SessionPool",
"TimestampVar", "StringVar",
"UnicodeVar" "TimestampVar",
] "UnicodeVar"
]
else:
moduleNames = [
"uConnection"
]
class BaseTestCase(unittest.TestCase): class BaseTestCase(unittest.TestCase):

116
test/uConnection.py Normal file
View File

@ -0,0 +1,116 @@
"""Module for testing connections."""
import threading
class TestConnection(TestCase):
def __ConnectAndDrop(self):
"""Connect to the database, perform a query and drop the connection."""
connection = cx_Oracle.connect(self.username, self.password,
self.tnsentry, threaded = True)
cursor = connection.cursor()
cursor.execute(u"select count(*) from TestNumbers")
count, = cursor.fetchone()
self.failUnlessEqual(count, 10)
def setUp(self):
self.username = USERNAME
self.password = PASSWORD
self.tnsentry = TNSENTRY
def verifyArgs(self, connection):
self.failUnlessEqual(connection.username, self.username,
"user name differs")
self.failUnlessEqual(connection.password, self.password,
"password differs")
self.failUnlessEqual(connection.tnsentry, self.tnsentry,
"tnsentry differs")
def testAllArgs(self):
"connection to database with user, password, TNS separate"
connection = cx_Oracle.connect(self.username, self.password,
self.tnsentry)
self.verifyArgs(connection)
def testBadConnectString(self):
"connection to database with bad connect string"
self.failUnlessRaises(cx_Oracle.DatabaseError, cx_Oracle.connect,
self.username)
self.failUnlessRaises(cx_Oracle.DatabaseError, cx_Oracle.connect,
self.username + u"@" + self.tnsentry)
self.failUnlessRaises(cx_Oracle.DatabaseError, cx_Oracle.connect,
self.username + u"@" + self.tnsentry + u"/" + self.password)
def testBadPassword(self):
"connection to database with bad password"
self.failUnlessRaises(cx_Oracle.DatabaseError, cx_Oracle.connect,
self.username, self.password + u"X", self.tnsentry)
def testExceptionOnClose(self):
"confirm an exception is raised after closing a connection"
connection = cx_Oracle.connect(self.username, self.password,
self.tnsentry)
connection.close()
self.failUnlessRaises(cx_Oracle.InterfaceError, connection.rollback)
def testMakeDSN(self):
"test making a data source name from host, port and sid"
formatString = u"(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=" \
u"(PROTOCOL=TCP)(HOST=%s)(PORT=%d)))(CONNECT_DATA=(SID=%s)))"
args = (u"hostname", 1521, u"TEST")
result = cx_Oracle.makedsn(*args)
self.failUnlessEqual(result, formatString % args)
def testSingleArg(self):
"connection to database with user, password, TNS together"
connection = cx_Oracle.connect(u"%s/%s@%s" % \
(self.username, self.password, self.tnsentry))
self.verifyArgs(connection)
def testVersion(self):
"connection version is a string"
connection = cx_Oracle.connect(self.username, self.password,
self.tnsentry)
self.failUnless(isinstance(connection.version, unicode))
def testRollbackOnClose(self):
"connection rolls back before close"
connection = cx_Oracle.connect(self.username, self.password,
self.tnsentry)
cursor = connection.cursor()
cursor.execute(u"truncate table TestExecuteMany")
otherConnection = cx_Oracle.connect(self.username, self.password,
self.tnsentry)
otherCursor = otherConnection.cursor()
otherCursor.execute(u"insert into TestExecuteMany values (1)")
otherConnection.close()
cursor.execute(u"select count(*) from TestExecuteMany")
count, = cursor.fetchone()
self.failUnlessEqual(count, 0)
def testRollbackOnDel(self):
"connection rolls back before destruction"
connection = cx_Oracle.connect(self.username, self.password,
self.tnsentry)
cursor = connection.cursor()
cursor.execute(u"truncate table TestExecuteMany")
otherConnection = cx_Oracle.connect(self.username, self.password,
self.tnsentry)
otherCursor = otherConnection.cursor()
otherCursor.execute(u"insert into TestExecuteMany values (1)")
del otherCursor
del otherConnection
cursor.execute(u"select count(*) from TestExecuteMany")
count, = cursor.fetchone()
self.failUnlessEqual(count, 0)
def testThreading(self):
"connection to database with multiple threads"
threads = []
for i in range(20):
thread = threading.Thread(None, self.__ConnectAndDrop)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()