Added support for returning unicode values for nchar and nvarchar data and

for binding unicode. Thanks to Amaury Forgeot d'Arc for the initial patch.
This commit is contained in:
Anthony Tuininga 2008-09-30 18:11:45 +00:00
parent d1e5fa3737
commit ea3def86c5
7 changed files with 583 additions and 114 deletions

View File

@ -588,10 +588,10 @@ static PyObject *Cursor_ItemDescriptionHelper(
unsigned pos, // position in description unsigned pos, // position in description
OCIParam *param) // parameter to use for description OCIParam *param) // parameter to use for description
{ {
ub2 internalSize, sqlDataType;
udt_VariableType *varType; udt_VariableType *varType;
int displaySize, index; int displaySize, index;
PyObject *tuple, *type; PyObject *tuple, *type;
ub2 internalSize;
ub4 nameLength; ub4 nameLength;
sb2 precision; sb2 precision;
sword status; sword status;
@ -599,15 +599,8 @@ static PyObject *Cursor_ItemDescriptionHelper(
ub1 nullOk; ub1 nullOk;
sb1 scale; sb1 scale;
// acquire type of item
status = OCIAttrGet(param, OCI_HTYPE_DESCRIBE, (dvoid*) &sqlDataType, 0,
OCI_ATTR_DATA_TYPE, self->environment->errorHandle);
if (Environment_CheckForError(self->environment, status,
"Cursor_ItemDescription(): data type") < 0)
return NULL;
// acquire usable type of item // acquire usable type of item
varType = Variable_TypeByOracleDataType(sqlDataType, SQLCS_IMPLICIT); varType = Variable_TypeByOracleDescriptor(param, self->environment);
if (!varType) if (!varType)
return NULL; return NULL;
@ -652,10 +645,14 @@ static PyObject *Cursor_ItemDescriptionHelper(
type = (PyObject*) varType->pythonType; type = (PyObject*) varType->pythonType;
if (type == (PyObject*) &g_StringVarType) if (type == (PyObject*) &g_StringVarType)
displaySize = internalSize; displaySize = internalSize;
else if (type == (PyObject*) &g_UnicodeVarType)
displaySize = internalSize / 2;
else if (type == (PyObject*) &g_BinaryVarType) else if (type == (PyObject*) &g_BinaryVarType)
displaySize = internalSize; displaySize = internalSize;
else if (type == (PyObject*) &g_FixedCharVarType) else if (type == (PyObject*) &g_FixedCharVarType)
displaySize = internalSize; displaySize = internalSize;
else if (type == (PyObject*) &g_FixedUnicodeVarType)
displaySize = internalSize / 2;
else if (type == (PyObject*) &g_NumberVarType) { else if (type == (PyObject*) &g_NumberVarType) {
if (precision) { if (precision) {
displaySize = precision + 1; displaySize = precision + 1;

View File

@ -9,8 +9,6 @@
typedef struct { typedef struct {
Variable_HEAD Variable_HEAD
char *data; char *data;
ub1 charsetForm;
ub2 charsetId;
} udt_StringVar; } udt_StringVar;
@ -18,7 +16,7 @@ typedef struct {
// Declaration of string variable functions. // Declaration of string variable functions.
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
static int StringVar_Initialize(udt_StringVar*, udt_Cursor*); static int StringVar_Initialize(udt_StringVar*, udt_Cursor*);
static int StringVar_PreDefine(udt_StringVar*, OCIParam*); static int StringVar_PostDefine(udt_StringVar*);
static int StringVar_SetValue(udt_StringVar*, unsigned, PyObject*); static int StringVar_SetValue(udt_StringVar*, unsigned, PyObject*);
static PyObject *StringVar_GetValue(udt_StringVar*, unsigned); static PyObject *StringVar_GetValue(udt_StringVar*, unsigned);
@ -51,6 +49,32 @@ static PyTypeObject g_StringVarType = {
}; };
static PyTypeObject g_UnicodeVarType = {
PyObject_HEAD_INIT(NULL)
0, // ob_size
"cx_Oracle.UNICODE", // tp_name
sizeof(udt_StringVar), // tp_basicsize
0, // tp_itemsize
(destructor) Variable_Free, // tp_dealloc
0, // tp_print
0, // tp_getattr
0, // tp_setattr
0, // tp_compare
(reprfunc) Variable_Repr, // tp_repr
0, // tp_as_number
0, // tp_as_sequence
0, // tp_as_mapping
0, // tp_hash
0, // tp_call
0, // tp_str
(getattrofunc) Variable_GetAttr, // tp_getattro
0, // tp_setattro
0, // tp_as_buffer
Py_TPFLAGS_DEFAULT, // tp_flags
0 // tp_doc
};
static PyTypeObject g_FixedCharVarType = { static PyTypeObject g_FixedCharVarType = {
PyObject_HEAD_INIT(NULL) PyObject_HEAD_INIT(NULL)
0, // ob_size 0, // ob_size
@ -77,6 +101,32 @@ static PyTypeObject g_FixedCharVarType = {
}; };
static PyTypeObject g_FixedUnicodeVarType = {
PyObject_HEAD_INIT(NULL)
0, // ob_size
"cx_Oracle.FIXED_UNICODE", // tp_name
sizeof(udt_StringVar), // tp_basicsize
0, // tp_itemsize
(destructor) Variable_Free, // tp_dealloc
0, // tp_print
0, // tp_getattr
0, // tp_setattr
0, // tp_compare
(reprfunc) Variable_Repr, // tp_repr
0, // tp_as_number
0, // tp_as_sequence
0, // tp_as_mapping
0, // tp_hash
0, // tp_call
0, // tp_str
(getattrofunc) Variable_GetAttr, // tp_getattro
0, // tp_setattro
0, // tp_as_buffer
Py_TPFLAGS_DEFAULT, // tp_flags
0 // tp_doc
};
static PyTypeObject g_RowidVarType = { static PyTypeObject g_RowidVarType = {
PyObject_HEAD_INIT(NULL) PyObject_HEAD_INIT(NULL)
0, // ob_size 0, // ob_size
@ -135,7 +185,7 @@ static PyTypeObject g_BinaryVarType = {
static udt_VariableType vt_String = { static udt_VariableType vt_String = {
(InitializeProc) StringVar_Initialize, (InitializeProc) StringVar_Initialize,
(FinalizeProc) NULL, (FinalizeProc) NULL,
(PreDefineProc) StringVar_PreDefine, (PreDefineProc) NULL,
(PostDefineProc) NULL, (PostDefineProc) NULL,
(IsNullProc) NULL, (IsNullProc) NULL,
(SetValueProc) StringVar_SetValue, (SetValueProc) StringVar_SetValue,
@ -153,12 +203,12 @@ static udt_VariableType vt_String = {
static udt_VariableType vt_NationalCharString = { static udt_VariableType vt_NationalCharString = {
(InitializeProc) StringVar_Initialize, (InitializeProc) StringVar_Initialize,
(FinalizeProc) NULL, (FinalizeProc) NULL,
(PreDefineProc) StringVar_PreDefine, (PreDefineProc) NULL,
(PostDefineProc) NULL, (PostDefineProc) StringVar_PostDefine,
(IsNullProc) NULL, (IsNullProc) NULL,
(SetValueProc) StringVar_SetValue, (SetValueProc) StringVar_SetValue,
(GetValueProc) StringVar_GetValue, (GetValueProc) StringVar_GetValue,
&g_StringVarType, // Python type &g_UnicodeVarType, // Python type
SQLT_CHR, // Oracle type SQLT_CHR, // Oracle type
SQLCS_NCHAR, // charset form SQLCS_NCHAR, // charset form
MAX_STRING_CHARS, // element length (default) MAX_STRING_CHARS, // element length (default)
@ -171,7 +221,7 @@ static udt_VariableType vt_NationalCharString = {
static udt_VariableType vt_FixedChar = { static udt_VariableType vt_FixedChar = {
(InitializeProc) StringVar_Initialize, (InitializeProc) StringVar_Initialize,
(FinalizeProc) NULL, (FinalizeProc) NULL,
(PreDefineProc) StringVar_PreDefine, (PreDefineProc) NULL,
(PostDefineProc) NULL, (PostDefineProc) NULL,
(IsNullProc) NULL, (IsNullProc) NULL,
(SetValueProc) StringVar_SetValue, (SetValueProc) StringVar_SetValue,
@ -186,6 +236,24 @@ static udt_VariableType vt_FixedChar = {
}; };
static udt_VariableType vt_FixedNationalChar = {
(InitializeProc) StringVar_Initialize,
(FinalizeProc) NULL,
(PreDefineProc) NULL,
(PostDefineProc) StringVar_PostDefine,
(IsNullProc) NULL,
(SetValueProc) StringVar_SetValue,
(GetValueProc) StringVar_GetValue,
&g_FixedUnicodeVarType, // Python type
SQLT_AFC, // Oracle type
SQLCS_NCHAR, // charset form
2000, // element length (default)
1, // is variable length
1, // can be copied
1 // can be in array
};
static udt_VariableType vt_Rowid = { static udt_VariableType vt_Rowid = {
(InitializeProc) StringVar_Initialize, (InitializeProc) StringVar_Initialize,
(FinalizeProc) NULL, (FinalizeProc) NULL,
@ -241,25 +309,28 @@ static int StringVar_Initialize(
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// StringVar_PreDefine() // StringVar_PostDefine()
// Set the character set information when values are fetched from this // Set the character set information when values are fetched from this
// variable. // variable.
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
static int StringVar_PreDefine( static int StringVar_PostDefine(
udt_StringVar *var, // variable to initialize udt_StringVar *var) // variable to initialize
OCIParam *param) // parameter handle
{ {
ub2 charsetId;
sword status; sword status;
status = OCIAttrGet(param, OCI_HTYPE_DESCRIBE, (dvoid*) &var->charsetForm, status = OCIAttrSet(var->defineHandle, OCI_HTYPE_DEFINE,
0, OCI_ATTR_CHARSET_FORM, var->environment->errorHandle); &var->type->charsetForm, 0, OCI_ATTR_CHARSET_FORM,
var->environment->errorHandle);
if (Environment_CheckForError(var->environment, status, if (Environment_CheckForError(var->environment, status,
"StringVar_PreDefine(): getting charset form") < 0) "StringVar_PostDefine(): setting charset form") < 0)
return -1; return -1;
status = OCIAttrGet(param, OCI_HTYPE_DESCRIBE, (dvoid*) &var->charsetId, 0,
charsetId = OCI_UTF16ID;
status = OCIAttrSet(var->defineHandle, OCI_HTYPE_DEFINE, &charsetId, 0,
OCI_ATTR_CHARSET_ID, var->environment->errorHandle); OCI_ATTR_CHARSET_ID, var->environment->errorHandle);
if (Environment_CheckForError(var->environment, status, if (Environment_CheckForError(var->environment, status,
"StringVar_PreDefine(): getting charset id") < 0) "StringVar_PostDefine(): setting charset id") < 0)
return -1; return -1;
return 0; return 0;
@ -275,33 +346,62 @@ static int StringVar_SetValue(
unsigned pos, // array position to set unsigned pos, // array position to set
PyObject *value) // value to set PyObject *value) // value to set
{ {
PyObject *encodedString;
Py_ssize_t bufferSize; Py_ssize_t bufferSize;
const void *buffer; const void *buffer;
ub2 actualLength;
// get the buffer data and size for binding // get the buffer data and size for binding
if (PyString_Check(value)) { encodedString = NULL;
buffer = PyString_AS_STRING(value); if (var->type->charsetForm == SQLCS_IMPLICIT) {
bufferSize = PyString_GET_SIZE(value); if (PyString_Check(value)) {
} else if (PyBuffer_Check(value)) { buffer = PyString_AS_STRING(value);
if (PyObject_AsReadBuffer(value, &buffer, &bufferSize) < 0) bufferSize = PyString_GET_SIZE(value);
} else if (PyBuffer_Check(value)) {
if (PyObject_AsReadBuffer(value, &buffer, &bufferSize) < 0)
return -1;
} else {
PyErr_SetString(PyExc_TypeError,
"expecting string or buffer data");
return -1; return -1;
}
actualLength = (ub2) bufferSize;
} else { } else {
PyErr_SetString(PyExc_TypeError, "expecting string or buffer data"); if (!PyUnicode_Check(value)) {
return -1; PyErr_SetString(PyExc_TypeError, "expecting unicode data");
return -1;
}
#ifdef Py_UNICODE_WIDE
int one = 1;
int byteOrder = (IS_LITTLE_ENDIAN) ? -1 : 1;
encodedString = PyUnicode_EncodeUTF16(PyUnicode_AS_UNICODE(value),
PyUnicode_GET_SIZE(value), NULL, byteOrder);
if (!encodedString)
return -1;
buffer = PyString_AS_STRING(encodedString);
bufferSize = PyString_GET_SIZE(encodedString);
#else
buffer = PyUnicode_AS_UNICODE(value);
bufferSize = sizeof(Py_UNICODE) * PyUnicode_GET_SIZE(value);
#endif
actualLength = bufferSize / 2;
} }
// ensure that the buffer is not too large // ensure that the buffer is not too large
if (bufferSize > var->maxLength) { if (bufferSize > var->maxLength) {
if (bufferSize > var->environment->maxStringBytes) { if (bufferSize > var->environment->maxStringBytes) {
PyErr_SetString(PyExc_ValueError, "string data too large"); PyErr_SetString(PyExc_ValueError, "string data too large");
Py_XDECREF(encodedString);
return -1; return -1;
} }
if (Variable_Resize( (udt_Variable*) var, bufferSize) < 0) if (Variable_Resize( (udt_Variable*) var, bufferSize) < 0) {
Py_XDECREF(encodedString);
return -1; return -1;
}
} }
// keep a copy of the string // keep a copy of the string
var->actualLength[pos] = (ub2) bufferSize; var->actualLength[pos] = actualLength;
if (bufferSize) if (bufferSize)
memcpy(var->data + var->maxLength * pos, buffer, bufferSize); memcpy(var->data + var->maxLength * pos, buffer, bufferSize);
@ -317,7 +417,16 @@ static PyObject *StringVar_GetValue(
udt_StringVar *var, // variable to determine value for udt_StringVar *var, // variable to determine value for
unsigned pos) // array position unsigned pos) // array position
{ {
return PyString_FromStringAndSize(var->data + pos * var->maxLength, char *data;
var->actualLength[pos]);
data = var->data + pos * var->maxLength;
if (var->type->charsetForm == SQLCS_IMPLICIT)
return PyString_FromStringAndSize(data, var->actualLength[pos]);
#ifdef Py_UNICODE_WIDE
ub4 bytes = var->actualLength[pos] * 2;
return PyUnicode_DecodeUTF16(data, bytes, NULL, NULL);
#else
return PyUnicode_FromUnicode((Py_UNICODE*) data, var->actualLength[pos]);
#endif
} }

View File

@ -150,8 +150,10 @@ static udt_Variable *Variable_New(
if (type->isVariableLength) { if (type->isVariableLength) {
if (elementLength < sizeof(ub2)) if (elementLength < sizeof(ub2))
elementLength = sizeof(ub2); elementLength = sizeof(ub2);
var->maxLength = if (type->charsetForm == SQLCS_IMPLICIT)
elementLength * cursor->environment->maxBytesPerCharacter; var->maxLength =
elementLength * cursor->environment->maxBytesPerCharacter;
else var->maxLength = elementLength * 2;
} }
// allocate the indicator and data // allocate the indicator and data
@ -276,6 +278,8 @@ static int Variable_Check(
object->ob_type == &g_NumberVarType || object->ob_type == &g_NumberVarType ||
object->ob_type == &g_StringVarType || object->ob_type == &g_StringVarType ||
object->ob_type == &g_FixedCharVarType || object->ob_type == &g_FixedCharVarType ||
object->ob_type == &g_UnicodeVarType ||
object->ob_type == &g_FixedUnicodeVarType ||
object->ob_type == &g_RowidVarType || object->ob_type == &g_RowidVarType ||
object->ob_type == &g_BinaryVarType || object->ob_type == &g_BinaryVarType ||
object->ob_type == &g_TimestampVarType object->ob_type == &g_TimestampVarType
@ -301,6 +305,12 @@ static udt_VariableType *Variable_TypeByPythonType(
return &vt_String; return &vt_String;
if (type == (PyObject*) &g_FixedCharVarType) if (type == (PyObject*) &g_FixedCharVarType)
return &vt_FixedChar; return &vt_FixedChar;
if (type == (PyObject*) &g_UnicodeVarType)
return &vt_NationalCharString;
if (type == (PyObject*) &PyUnicode_Type)
return &vt_NationalCharString;
if (type == (PyObject*) &g_FixedUnicodeVarType)
return &vt_FixedNationalChar;
if (type == (PyObject*) &g_RowidVarType) if (type == (PyObject*) &g_RowidVarType)
return &vt_Rowid; return &vt_Rowid;
if (type == (PyObject*) &g_BinaryVarType) if (type == (PyObject*) &g_BinaryVarType)
@ -375,6 +385,8 @@ static udt_VariableType *Variable_TypeByValue(
return &vt_String; return &vt_String;
if (PyString_Check(value)) if (PyString_Check(value))
return &vt_String; return &vt_String;
if (PyUnicode_Check(value))
return &vt_NationalCharString;
if (PyInt_Check(value)) if (PyInt_Check(value))
return &vt_Integer; return &vt_Integer;
if (PyLong_Check(value)) if (PyLong_Check(value))
@ -439,6 +451,8 @@ static udt_VariableType *Variable_TypeByOracleDataType (
case SQLT_LNG: case SQLT_LNG:
return &vt_LongString; return &vt_LongString;
case SQLT_AFC: case SQLT_AFC:
if (charsetForm == SQLCS_NCHAR)
return &vt_FixedNationalChar;
return &vt_FixedChar; return &vt_FixedChar;
case SQLT_CHR: case SQLT_CHR:
if (charsetForm == SQLCS_NCHAR) if (charsetForm == SQLCS_NCHAR)
@ -489,6 +503,41 @@ static udt_VariableType *Variable_TypeByOracleDataType (
} }
//-----------------------------------------------------------------------------
// Variable_TypeByOracleDescriptor()
// Return a variable type given an Oracle descriptor.
//-----------------------------------------------------------------------------
static udt_VariableType *Variable_TypeByOracleDescriptor(
OCIParam *param, // parameter to get type from
udt_Environment *environment) // environment to use
{
ub1 charsetForm;
ub2 dataType;
sword status;
// retrieve datatype of the parameter
status = OCIAttrGet(param, OCI_HTYPE_DESCRIBE, (dvoid*) &dataType, 0,
OCI_ATTR_DATA_TYPE, environment->errorHandle);
if (Environment_CheckForError(environment, status,
"Variable_TypeByOracleDescriptor(): data type") < 0)
return NULL;
// retrieve character set form of the parameter
if (dataType != SQLT_CHR && dataType != SQLT_AFC &&
dataType != SQLT_CLOB) {
charsetForm = SQLCS_IMPLICIT;
} else {
status = OCIAttrGet(param, OCI_HTYPE_DESCRIBE, (dvoid*) &charsetForm,
0, OCI_ATTR_CHARSET_FORM, environment->errorHandle);
if (Environment_CheckForError(environment, status,
"Variable_TypeByOracleDescriptor(): charset form") < 0)
return NULL;
}
return Variable_TypeByOracleDataType(dataType, charsetForm);
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Variable_MakeArray() // Variable_MakeArray()
// Make the variable an array, ensuring that the type supports arrays. // Make the variable an array, ensuring that the type supports arrays.
@ -713,33 +762,14 @@ static udt_Variable *Variable_DefineHelper(
unsigned position, // position in define list unsigned position, // position in define list
unsigned numElements) // number of elements to create unsigned numElements) // number of elements to create
{ {
ub2 dataType, lengthFromOracle;
udt_VariableType *varType; udt_VariableType *varType;
ub2 lengthFromOracle;
udt_Variable *var; udt_Variable *var;
ub1 charsetForm;
ub4 maxLength; ub4 maxLength;
sword status; sword status;
// retrieve datatype of the parameter
status = OCIAttrGet(param, OCI_HTYPE_DESCRIBE, (dvoid*) &dataType, 0,
OCI_ATTR_DATA_TYPE, cursor->environment->errorHandle);
if (Environment_CheckForError(cursor->environment, status,
"Variable_Define(): data type") < 0)
return NULL;
// retrieve character set form of the parameter
if (dataType != SQLT_CHR && dataType != SQLT_CLOB) {
charsetForm = SQLCS_IMPLICIT;
} else {
status = OCIAttrGet(param, OCI_HTYPE_DESCRIBE, (dvoid*) &charsetForm,
0, OCI_ATTR_CHARSET_FORM, cursor->environment->errorHandle);
if (Environment_CheckForError(cursor->environment, status,
"Variable_Define(): charset form") < 0)
return NULL;
}
// determine data type // determine data type
varType = Variable_TypeByOracleDataType(dataType, charsetForm); varType = Variable_TypeByOracleDescriptor(param, cursor->environment);
if (!varType) if (!varType)
return NULL; return NULL;
if (cursor->numbersAsStrings && varType == &vt_Float) if (cursor->numbersAsStrings && varType == &vt_Float)
@ -849,6 +879,7 @@ static udt_Variable *Variable_Define(
static int Variable_InternalBind( static int Variable_InternalBind(
udt_Variable *var) // variable to bind udt_Variable *var) // variable to bind
{ {
ub2 charsetId;
sword status; sword status;
// perform the bind // perform the bind
@ -887,7 +918,7 @@ static int Variable_InternalBind(
"Variable_InternalBind()") < 0) "Variable_InternalBind()") < 0)
return -1; return -1;
// set the charset form if applicable // set the charset form and id if applicable
if (var->type->charsetForm != SQLCS_IMPLICIT) { if (var->type->charsetForm != SQLCS_IMPLICIT) {
status = OCIAttrSet(var->bindHandle, OCI_HTYPE_BIND, status = OCIAttrSet(var->bindHandle, OCI_HTYPE_BIND,
(dvoid*) &var->type->charsetForm, 0, OCI_ATTR_CHARSET_FORM, (dvoid*) &var->type->charsetForm, 0, OCI_ATTR_CHARSET_FORM,
@ -895,6 +926,20 @@ static int Variable_InternalBind(
if (Environment_CheckForError(var->environment, status, if (Environment_CheckForError(var->environment, status,
"Variable_InternalBind(): set charset form") < 0) "Variable_InternalBind(): set charset form") < 0)
return -1; return -1;
charsetId = OCI_UTF16ID;
status = OCIAttrSet(var->bindHandle, OCI_HTYPE_BIND,
(dvoid*) &charsetId, 0, OCI_ATTR_CHARSET_ID,
var->environment->errorHandle);
if (Environment_CheckForError(var->environment, status,
"Variable_InternalBind(): setting charset Id") < 0)
return -1;
ub4 lengthInChars = var->maxLength / 2;
status = OCIAttrSet(var->bindHandle, OCI_HTYPE_BIND,
(dvoid*) &lengthInChars, 0, OCI_ATTR_CHAR_COUNT,
var->environment->errorHandle);
if (Environment_CheckForError(var->environment, status,
"Variable_InternalBind(): set char count") < 0)
return -1;
} }
// set the max data size for strings // set the max data size for strings

View File

@ -39,6 +39,10 @@ typedef int Py_ssize_t;
#define PY_SSIZE_T_MIN INT_MIN #define PY_SSIZE_T_MIN INT_MIN
#endif #endif
// define simple construct for determining endianness of the platform
// Oracle uses native encoding with OCI_UTF16 but bails when a BOM is written
#define IS_LITTLE_ENDIAN (int)*(unsigned char*) &one
// define macro for adding OCI constants // define macro for adding OCI constants
#define ADD_OCI_CONSTANT(x) \ #define ADD_OCI_CONSTANT(x) \
if (PyModule_AddIntConstant(module, #x, OCI_ ##x) < 0) \ if (PyModule_AddIntConstant(module, #x, OCI_ ##x) < 0) \
@ -50,6 +54,11 @@ typedef int Py_ssize_t;
if (PyModule_AddObject(module, name, (PyObject*) type) < 0) \ if (PyModule_AddObject(module, name, (PyObject*) type) < 0) \
return; return;
// define macro for making a type ready
#define MAKE_TYPE_READY(type) \
if (PyType_Ready(type) < 0) \
return;
// define macros to get the build version as a string and the driver name // define macros to get the build version as a string and the driver name
#define xstr(s) str(s) #define xstr(s) str(s)
#define str(s) #s #define str(s) #s
@ -323,61 +332,37 @@ void initcx_Oracle(void)
PyErr_Clear(); PyErr_Clear();
// prepare the types for use by the module // prepare the types for use by the module
if (PyType_Ready(&g_ConnectionType) < 0) MAKE_TYPE_READY(&g_ConnectionType);
return; MAKE_TYPE_READY(&g_CursorType);
if (PyType_Ready(&g_CursorType) < 0) MAKE_TYPE_READY(&g_ErrorType);
return;
if (PyType_Ready(&g_ErrorType) < 0)
return;
#ifndef NATIVE_DATETIME #ifndef NATIVE_DATETIME
if (PyType_Ready(&g_ExternalDateTimeVarType) < 0) MAKE_TYPE_READY(&g_ExternalDateTimeVarType);
return;
#endif #endif
if (PyType_Ready(&g_SessionPoolType) < 0) MAKE_TYPE_READY(&g_SessionPoolType);
return; MAKE_TYPE_READY(&g_TimestampVarType);
if (PyType_Ready(&g_TimestampVarType) < 0) MAKE_TYPE_READY(&g_EnvironmentType);
return; MAKE_TYPE_READY(&g_ObjectTypeType);
if (PyType_Ready(&g_EnvironmentType) < 0) MAKE_TYPE_READY(&g_ObjectAttributeType);
return; MAKE_TYPE_READY(&g_StringVarType);
if (PyType_Ready(&g_ObjectTypeType) < 0) MAKE_TYPE_READY(&g_FixedCharVarType);
return; MAKE_TYPE_READY(&g_RowidVarType);
if (PyType_Ready(&g_ObjectAttributeType) < 0) MAKE_TYPE_READY(&g_BinaryVarType);
return; MAKE_TYPE_READY(&g_LongStringVarType);
if (PyType_Ready(&g_StringVarType) < 0) MAKE_TYPE_READY(&g_LongBinaryVarType);
return; MAKE_TYPE_READY(&g_NumberVarType);
if (PyType_Ready(&g_FixedCharVarType) < 0) MAKE_TYPE_READY(&g_ExternalLobVarType);
return; MAKE_TYPE_READY(&g_DateTimeVarType);
if (PyType_Ready(&g_RowidVarType) < 0) MAKE_TYPE_READY(&g_CLOBVarType);
return; MAKE_TYPE_READY(&g_NCLOBVarType);
if (PyType_Ready(&g_BinaryVarType) < 0) MAKE_TYPE_READY(&g_BLOBVarType);
return; MAKE_TYPE_READY(&g_BFILEVarType);
if (PyType_Ready(&g_LongStringVarType) < 0) MAKE_TYPE_READY(&g_CursorVarType);
return; MAKE_TYPE_READY(&g_ObjectVarType);
if (PyType_Ready(&g_LongBinaryVarType) < 0) MAKE_TYPE_READY(&g_ExternalObjectVarType);
return; MAKE_TYPE_READY(&g_UnicodeVarType);
if (PyType_Ready(&g_NumberVarType) < 0) MAKE_TYPE_READY(&g_FixedUnicodeVarType);
return;
if (PyType_Ready(&g_ExternalLobVarType) < 0)
return;
if (PyType_Ready(&g_DateTimeVarType) < 0)
return;
if (PyType_Ready(&g_CLOBVarType) < 0)
return;
if (PyType_Ready(&g_NCLOBVarType) < 0)
return;
if (PyType_Ready(&g_BLOBVarType) < 0)
return;
if (PyType_Ready(&g_BFILEVarType) < 0)
return;
if (PyType_Ready(&g_CursorVarType) < 0)
return;
if (PyType_Ready(&g_ObjectVarType) < 0)
return;
if (PyType_Ready(&g_ExternalObjectVarType) < 0)
return;
#ifdef SQLT_BFLOAT #ifdef SQLT_BFLOAT
if (PyType_Ready(&g_NativeFloatVarType) < 0) MAKE_TYPE_READY(&g_NativeFloatVarType);
return;
#endif #endif
// initialize module and retrieve the dictionary // initialize module and retrieve the dictionary
@ -446,6 +431,7 @@ void initcx_Oracle(void)
ADD_TYPE_OBJECT("DATETIME", &g_ExternalDateTimeVarType) ADD_TYPE_OBJECT("DATETIME", &g_ExternalDateTimeVarType)
#endif #endif
ADD_TYPE_OBJECT("FIXED_CHAR", &g_FixedCharVarType) ADD_TYPE_OBJECT("FIXED_CHAR", &g_FixedCharVarType)
ADD_TYPE_OBJECT("FIXED_UNICODE", &g_FixedUnicodeVarType)
ADD_TYPE_OBJECT("LOB", &g_ExternalLobVarType) ADD_TYPE_OBJECT("LOB", &g_ExternalLobVarType)
ADD_TYPE_OBJECT("LONG_BINARY", &g_LongBinaryVarType) ADD_TYPE_OBJECT("LONG_BINARY", &g_LongBinaryVarType)
ADD_TYPE_OBJECT("LONG_STRING", &g_LongStringVarType) ADD_TYPE_OBJECT("LONG_STRING", &g_LongStringVarType)
@ -454,6 +440,7 @@ void initcx_Oracle(void)
ADD_TYPE_OBJECT("ROWID", &g_RowidVarType) ADD_TYPE_OBJECT("ROWID", &g_RowidVarType)
ADD_TYPE_OBJECT("STRING", &g_StringVarType) ADD_TYPE_OBJECT("STRING", &g_StringVarType)
ADD_TYPE_OBJECT("TIMESTAMP", &g_TimestampVarType) ADD_TYPE_OBJECT("TIMESTAMP", &g_TimestampVarType)
ADD_TYPE_OBJECT("UNICODE", &g_UnicodeVarType)
#ifdef SQLT_BFLOAT #ifdef SQLT_BFLOAT
ADD_TYPE_OBJECT("NATIVE_FLOAT", &g_NativeFloatVarType) ADD_TYPE_OBJECT("NATIVE_FLOAT", &g_NativeFloatVarType)
#endif #endif

View File

@ -6,6 +6,18 @@
whenever sqlerror exit failure whenever sqlerror exit failure
-- drop existing users, if present
begin
for r in
( select username
from dba_users
where username in ('CX_ORACLE', 'CX_ORACLE_PROXY')
) loop
execute immediate 'drop user ' || r.username || ' cascade';
end loop;
end;
/
alter session set nls_date_format = 'YYYY-MM-DD HH24:MI:SS'; alter session set nls_date_format = 'YYYY-MM-DD HH24:MI:SS';
alter session set nls_numeric_characters='.,'; alter session set nls_numeric_characters='.,';
@ -55,6 +67,13 @@ create table cx_Oracle.TestStrings (
NullableCol varchar2(50) NullableCol varchar2(50)
) tablespace users; ) tablespace users;
create table cx_Oracle.TestUnicodes (
IntCol number(9) not null,
UnicodeCol nvarchar2(20) not null,
FixedUnicodeCol nchar(40) not null,
NullableCol nvarchar2(50)
) tablespace users;
create table cx_Oracle.TestDates ( create table cx_Oracle.TestDates (
IntCol number(9) not null, IntCol number(9) not null,
DateCol date not null, DateCol date not null,
@ -149,6 +168,16 @@ begin
end; end;
/ /
begin
for i in 1..10 loop
insert into cx_Oracle.TestUnicodes
values (i, 'Unicode ' || unistr('\3042') || ' ' || to_char(i),
'Fixed Unicode ' || to_char(i),
decode(mod(i, 2), 0, null, 'Nullable ' || to_char(i)));
end loop;
end;
/
begin begin
for i in 1..10 loop for i in 1..10 loop
insert into cx_Oracle.TestDates insert into cx_Oracle.TestDates
@ -287,6 +316,69 @@ create or replace package body cx_Oracle.pkg_TestStringArrays as
end; end;
/ /
create or replace package cx_Oracle.pkg_TestUnicodeArrays as
type udt_UnicodeList is table of nvarchar2(100) index by binary_integer;
function TestInArrays (
a_StartingLength number,
a_Array udt_UnicodeList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_UnicodeList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_UnicodeList
);
end;
/
create or replace package body cx_Oracle.pkg_TestUnicodeArrays as
function TestInArrays (
a_StartingLength number,
a_Array udt_UnicodeList
) return number is
t_Length number;
begin
t_Length := a_StartingLength;
for i in 1..a_Array.count loop
t_Length := t_Length + length(a_Array(i));
end loop;
return t_Length;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_UnicodeList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := unistr('Converted element ' || unistr('\3042') ||
' # ') || to_char(i) || ' originally had length ' ||
to_char(length(a_Array(i)));
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_UnicodeList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := unistr('Test out element ') || unistr('\3042') || ' # ' ||
to_char(i);
end loop;
end;
end;
/
create or replace package cx_Oracle.pkg_TestNumberArrays as create or replace package cx_Oracle.pkg_TestNumberArrays as
type udt_NumberList is table of number index by binary_integer; type udt_NumberList is table of number index by binary_integer;

238
test/UnicodeVar.py Normal file
View File

@ -0,0 +1,238 @@
"""Module for testing unicode variables."""
class TestUnicodeVar(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.rawData = []
self.dataByKey = {}
for i in range(1, 11):
unicodeCol = u"Unicode \u3042 %d" % i
fixedCharCol = (u"Fixed Unicode %d" % i).ljust(40)
if i % 2:
nullableCol = u"Nullable %d" % i
else:
nullableCol = None
dataTuple = (i, unicodeCol, fixedCharCol, nullableCol)
self.rawData.append(dataTuple)
self.dataByKey[i] = dataTuple
def testUnicodeLength(self):
"test value length"
returnValue = self.cursor.var(int)
self.cursor.execute("""
begin
:retval := LENGTH(:value);
end;""",
value = u"InVal \u3042",
retval = returnValue)
self.failUnlessEqual(returnValue.getvalue(), 7)
def testBindUnicode(self):
"test binding in a unicode"
self.cursor.execute("""
select * from TestUnicodes
where UnicodeCol = :value""",
value = u"Unicode \u3042 5")
self.failUnlessEqual(self.cursor.fetchall(), [self.dataByKey[5]])
def testBindDifferentVar(self):
"test binding a different variable on second execution"
retval_1 = self.cursor.var(cx_Oracle.UNICODE, 30)
retval_2 = self.cursor.var(cx_Oracle.UNICODE, 30)
self.cursor.execute(r"begin :retval := unistr('Called \3042'); end;",
retval = retval_1)
self.failUnlessEqual(retval_1.getvalue(), u"Called \u3042")
self.cursor.execute("begin :retval := 'Called'; end;",
retval = retval_2)
self.failUnlessEqual(retval_2.getvalue(), "Called")
def testBindUnicodeAfterNumber(self):
"test binding in a unicode after setting input sizes to a number"
self.cursor.setinputsizes(value = cx_Oracle.NUMBER)
self.cursor.execute("""
select * from TestUnicodes
where UnicodeCol = :value""",
value = u"Unicode \u3042 6")
self.failUnlessEqual(self.cursor.fetchall(), [self.dataByKey[6]])
def testBindUnicodeArrayDirect(self):
"test binding in a unicode array"
returnValue = self.cursor.var(cx_Oracle.NUMBER)
array = [r[1] for r in self.rawData]
statement = """
begin
:retval := pkg_TestUnicodeArrays.TestInArrays(
:integerValue, :array);
end;"""
self.cursor.execute(statement,
retval = returnValue,
integerValue = 5,
array = array)
self.failUnlessEqual(returnValue.getvalue(), 116)
array = [ u"Unicode - \u3042 %d" % i for i in range(15) ]
self.cursor.execute(statement,
integerValue = 8,
array = array)
self.failUnlessEqual(returnValue.getvalue(), 208)
def testBindUnicodeArrayBySizes(self):
"test binding in a unicode array (with setinputsizes)"
returnValue = self.cursor.var(cx_Oracle.NUMBER)
self.cursor.setinputsizes(array = [cx_Oracle.UNICODE, 10])
array = [r[1] for r in self.rawData]
self.cursor.execute("""
begin
:retval := pkg_TestUnicodeArrays.TestInArrays(:integerValue,
:array);
end;""",
retval = returnValue,
integerValue = 6,
array = array)
self.failUnlessEqual(returnValue.getvalue(), 117)
def testBindUnicodeArrayByVar(self):
"test binding in a unicode array (with arrayvar)"
returnValue = self.cursor.var(cx_Oracle.NUMBER)
array = self.cursor.arrayvar(cx_Oracle.UNICODE, 10, 20)
array.setvalue(0, [r[1] for r in self.rawData])
self.cursor.execute("""
begin
:retval := pkg_TestUnicodeArrays.TestInArrays(:integerValue,
:array);
end;""",
retval = returnValue,
integerValue = 7,
array = array)
self.failUnlessEqual(returnValue.getvalue(), 118)
def testBindInOutUnicodeArrayByVar(self):
"test binding in/out a unicode array (with arrayvar)"
array = self.cursor.arrayvar(cx_Oracle.UNICODE, 10, 100)
originalData = [r[1] for r in self.rawData]
format = u"Converted element \u3042 # %d originally had length %d"
expectedData = [format % (i, len(originalData[i - 1])) \
for i in range(1, 6)] + originalData[5:]
array.setvalue(0, originalData)
self.cursor.execute("""
begin
pkg_TestUnicodeArrays.TestInOutArrays(:numElems, :array);
end;""",
numElems = 5,
array = array)
self.failUnlessEqual(array.getvalue(), expectedData)
def testBindOutUnicodeArrayByVar(self):
"test binding out a unicode array (with arrayvar)"
array = self.cursor.arrayvar(cx_Oracle.UNICODE, 6, 100)
format = u"Test out element \u3042 # %d"
expectedData = [format % i for i in range(1, 7)]
self.cursor.execute("""
begin
pkg_TestUnicodeArrays.TestOutArrays(:numElems, :array);
end;""",
numElems = 6,
array = array)
self.failUnlessEqual(array.getvalue(), expectedData)
def testBindNull(self):
"test binding in a null"
self.cursor.execute("""
select * from TestUnicodes
where UnicodeCol = :value""",
value = None)
self.failUnlessEqual(self.cursor.fetchall(), [])
def testBindOutSetInputSizesByType(self):
"test binding out with set input sizes defined (by type)"
vars = self.cursor.setinputsizes(value = cx_Oracle.UNICODE)
self.cursor.execute(r"""
begin
:value := unistr('TSI \3042');
end;""")
self.failUnlessEqual(vars["value"].getvalue(), u"TSI \u3042")
def testBindInOutSetInputSizesByType(self):
"test binding in/out with set input sizes defined (by type)"
vars = self.cursor.setinputsizes(value = cx_Oracle.UNICODE)
self.cursor.execute(r"""
begin
:value := :value || unistr(' TSI \3042');
end;""",
value = u"InVal \u3041")
self.failUnlessEqual(vars["value"].getvalue(),
u"InVal \u3041 TSI \u3042")
def testBindOutVar(self):
"test binding out with cursor.var() method"
var = self.cursor.var(cx_Oracle.UNICODE)
self.cursor.execute(r"""
begin
:value := unistr('TSI (VAR) \3042');
end;""",
value = var)
self.failUnlessEqual(var.getvalue(), u"TSI (VAR) \u3042")
def testBindInOutVarDirectSet(self):
"test binding in/out with cursor.var() method"
var = self.cursor.var(cx_Oracle.UNICODE)
var.setvalue(0, u"InVal \u3041")
self.cursor.execute(r"""
begin
:value := :value || unistr(' TSI (VAR) \3042');
end;""",
value = var)
self.failUnlessEqual(var.getvalue(), u"InVal \u3041 TSI (VAR) \u3042")
def testUnicodeMaximumReached(self):
"test that an error is raised when maximum unicode length exceeded"
var = self.cursor.setinputsizes(test = cx_Oracle.UNICODE)["test"]
inUnicode = u"1234567890" * 400
var.setvalue(0, inUnicode)
outUnicode = var.getvalue()
self.failUnlessEqual(inUnicode, outUnicode,
"output does not match: in was %d, out was %d" % \
(len(inUnicode), len(outUnicode)))
inUnicode = inUnicode + u"0"
self.failUnlessRaises(ValueError, var.setvalue, 0, inUnicode)
def testCursorDescription(self):
"test cursor description is accurate"
self.cursor.execute("select * from TestUnicodes")
self.failUnlessEqual(self.cursor.description,
[ ('INTCOL', cx_Oracle.NUMBER, 10, 22, 9, 0, 0),
('UNICODECOL', cx_Oracle.UNICODE, 20, 40, 0, 0, 0),
('FIXEDUNICODECOL',
cx_Oracle.FIXED_UNICODE, 40, 80, 0, 0, 0),
('NULLABLECOL', cx_Oracle.UNICODE, 50, 100, 0, 0, 1) ])
def testFetchAll(self):
"test that fetching all of the data returns the correct results"
self.cursor.execute("select * From TestUnicodes order by IntCol")
self.failUnlessEqual(self.cursor.fetchall(), self.rawData)
self.failUnlessEqual(self.cursor.fetchall(), [])
def testFetchMany(self):
"test that fetching data in chunks returns the correct results"
self.cursor.execute("select * From TestUnicodes order by IntCol")
self.failUnlessEqual(self.cursor.fetchmany(3), self.rawData[0:3])
self.failUnlessEqual(self.cursor.fetchmany(2), self.rawData[3:5])
self.failUnlessEqual(self.cursor.fetchmany(4), self.rawData[5:9])
self.failUnlessEqual(self.cursor.fetchmany(3), self.rawData[9:])
self.failUnlessEqual(self.cursor.fetchmany(3), [])
def testFetchOne(self):
"test that fetching a single row returns the correct results"
self.cursor.execute("""
select *
from TestUnicodes
where IntCol in (3, 4)
order by IntCol""")
self.failUnlessEqual(self.cursor.fetchone(), self.dataByKey[3])
self.failUnlessEqual(self.cursor.fetchone(), self.dataByKey[4])
self.failUnlessEqual(self.cursor.fetchone(), None)
if __name__ == "__main__":
print "Testing cx_Oracle version", cx_Oracle.version
unittest.main()

View File

@ -21,7 +21,8 @@ moduleNames = [
"ObjectVar", "ObjectVar",
"SessionPool", "SessionPool",
"StringVar", "StringVar",
"TimestampVar" "TimestampVar",
"UnicodeVar"
] ]
class BaseTestCase(unittest.TestCase): class BaseTestCase(unittest.TestCase):