Adjusted test cases to make use of encoding and nencoding parameters to both

standalone connection and session pool creation; added test cases for binding
and fetching supplemental characters as well as fetching temporary LOBs; tidied
up a few test cases as well.
This commit is contained in:
Anthony Tuininga 2017-01-13 18:14:45 -07:00
parent 2b9e80308e
commit 8fda704186
7 changed files with 120 additions and 66 deletions

View File

@ -214,14 +214,13 @@ class TestCursor(BaseTestCase):
value, = testIter.next()
self.cursor.execute("insert into TestTempTable (IntCol) values (1)")
if sys.version_info[0] >= 3:
self.assertRaises(cx_Oracle.InterfaceError, next, testIter)
self.assertRaises(cx_Oracle.InterfaceError, next, testIter)
else:
self.assertRaises(cx_Oracle.InterfaceError, testIter.next)
self.assertRaises(cx_Oracle.InterfaceError, testIter.next)
def testBindNames(self):
"""test that bindnames() works correctly."""
self.assertRaises(cx_Oracle.ProgrammingError,
self.cursor.bindnames)
self.assertRaises(cx_Oracle.ProgrammingError, self.cursor.bindnames)
self.cursor.prepare("begin null; end;")
self.assertEqual(self.cursor.bindnames(), [])
self.cursor.prepare("begin :retval := :inval + 5; end;")

View File

@ -4,10 +4,21 @@ import sys
class TestLobVar(BaseTestCase):
def __PerformTest(self, type, inputType):
def __GetTempLobs(self, sid):
cursor = self.connection.cursor()
cursor.execute("""
select abstract_lobs
from v$temporary_lobs
where sid = :sid""", sid = sid)
row = cursor.fetchone()
if row is None:
return 0
return int(row[0])
def __PerformTest(self, lobType, inputType):
longString = ""
directType = getattr(cx_Oracle, type)
self.cursor.execute("truncate table Test%ss" % type)
directType = getattr(cx_Oracle, lobType)
self.cursor.execute("truncate table Test%ss" % lobType)
for i in range(0, 11):
if i > 0:
char = chr(ord('A') + i - 1)
@ -15,7 +26,7 @@ class TestLobVar(BaseTestCase):
elif inputType != directType:
continue
self.cursor.setinputsizes(longString = inputType)
if type == "BLOB" and sys.version_info[0] >= 3:
if lobType == "BLOB" and sys.version_info[0] >= 3:
bindValue = longString.encode("ascii")
else:
bindValue = longString
@ -26,48 +37,21 @@ class TestLobVar(BaseTestCase):
) values (
:integerValue,
:longString
)""" % (type, type),
)""" % (lobType, lobType),
integerValue = i,
longString = bindValue)
self.connection.commit()
self.cursor.execute("""
select *
from Test%ss
order by IntCol""" % type)
longString = ""
for row in self.cursor:
integerValue, lob = row
if integerValue == 0:
self.assertEqual(lob.size(), 0)
expectedValue = ""
if type == "BLOB" and sys.version_info[0] >= 3:
expectedValue = expectedValue.encode("ascii")
self.assertEqual(lob.read(), expectedValue)
else:
char = chr(ord('A') + integerValue - 1)
prevChar = chr(ord('A') + integerValue - 2)
longString += char * 25000
if type == "BLOB" and sys.version_info[0] >= 3:
actualValue = longString.encode("ascii")
char = char.encode("ascii")
prevChar = prevChar.encode("ascii")
else:
actualValue = longString
self.assertEqual(lob.size(), len(actualValue))
self.assertEqual(lob.read(), actualValue)
if type == "CLOB":
self.assertEqual(str(lob), actualValue)
self.assertEqual(lob.read(len(actualValue)), char)
if integerValue > 1:
offset = (integerValue - 1) * 25000 - 4
string = prevChar * 5 + char * 5
self.assertEqual(lob.read(offset, 10), string)
order by IntCol""" % lobType)
self.__ValidateQuery(self.cursor, lobType)
def __TestTrim(self, type):
self.cursor.execute("truncate table Test%ss" % type)
self.cursor.setinputsizes(longString = getattr(cx_Oracle, type))
def __TestTrim(self, lobType):
self.cursor.execute("truncate table Test%ss" % lobType)
self.cursor.setinputsizes(longString = getattr(cx_Oracle, lobType))
longString = "X" * 75000
if type == "BLOB" and sys.version_info[0] >= 3:
if lobType == "BLOB" and sys.version_info[0] >= 3:
longString = longString.encode("ascii")
self.cursor.execute("""
insert into Test%ss (
@ -76,13 +60,13 @@ class TestLobVar(BaseTestCase):
) values (
:integerValue,
:longString
)""" % (type, type),
)""" % (lobType, lobType),
integerValue = 1,
longString = longString)
self.cursor.execute("""
select %sCol
from Test%ss
where IntCol = 1""" % (type, type))
where IntCol = 1""" % (lobType, lobType))
lob, = self.cursor.fetchone()
self.assertEqual(lob.size(), 75000)
lob.trim(25000)
@ -90,6 +74,36 @@ class TestLobVar(BaseTestCase):
lob.trim()
self.assertEqual(lob.size(), 0)
def __ValidateQuery(self, rows, lobType):
longString = ""
for row in self.cursor:
integerValue, lob = row
if integerValue == 0:
self.assertEqual(lob.size(), 0)
expectedValue = ""
if lobType == "BLOB" and sys.version_info[0] >= 3:
expectedValue = expectedValue.encode("ascii")
self.assertEqual(lob.read(), expectedValue)
else:
char = chr(ord('A') + integerValue - 1)
prevChar = chr(ord('A') + integerValue - 2)
longString += char * 25000
if lobType == "BLOB" and sys.version_info[0] >= 3:
actualValue = longString.encode("ascii")
char = char.encode("ascii")
prevChar = prevChar.encode("ascii")
else:
actualValue = longString
self.assertEqual(lob.size(), len(actualValue))
self.assertEqual(lob.read(), actualValue)
if lobType == "CLOB":
self.assertEqual(str(lob), actualValue)
self.assertEqual(lob.read(len(actualValue)), char)
if integerValue > 1:
offset = (integerValue - 1) * 25000 - 4
string = prevChar * 5 + char * 5
self.assertEqual(lob.read(offset, 10), string)
def testBLOBCursorDescription(self):
"test cursor description is accurate for BLOBs"
self.cursor.execute("select * from TestBLOBs")
@ -131,9 +145,9 @@ class TestLobVar(BaseTestCase):
def testMultipleFetch(self):
"test retrieving data from a CLOB after multiple fetches"
self.cursor.arraysize = 1
self.cursor.execute("select CLOBCol from TestCLOBS")
self.cursor.execute("select * from TestCLOBS")
rows = self.cursor.fetchall()
self.assertRaises(cx_Oracle.ProgrammingError, rows[1][0].read)
self.__ValidateQuery(rows, "CLOB")
def testNCLOBCursorDescription(self):
"test cursor description is accurate for NCLOBs"
@ -154,3 +168,23 @@ class TestLobVar(BaseTestCase):
"test trimming a CLOB"
self.__TestTrim("CLOB")
def testTemporaryLobs(self):
cursor = self.connection.cursor()
cursor.arraysize = self.cursor.arraysize
cursor.execute("""
select sid
from v$session
where audsid = userenv('sessionid')""")
sid, = cursor.fetchone()
tempLobs = self.__GetTempLobs(sid)
self.assertEqual(tempLobs, 0)
cursor.execute("""
select extract(xmlcol, '/').getclobval()
from TestXML""")
for lob, in cursor:
value = lob.read()
del lob
cursor.close()
tempLobs = self.__GetTempLobs(sid)
self.assertEqual(tempLobs, 0)

View File

@ -21,7 +21,8 @@ class TestConnection(TestCase):
def testPool(self):
"""test that the pool is created and has the right attributes"""
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3)
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3,
encoding = ENCODING, nencoding = NENCODING)
self.assertEqual(pool.username, USERNAME, "user name differs")
self.assertEqual(pool.tnsentry, TNSENTRY, "tnsentry differs")
self.assertEqual(pool.max, 8, "max differs")
@ -45,13 +46,15 @@ class TestConnection(TestCase):
def testProxyAuth(self):
"""test that proxy authentication is possible"""
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3)
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3,
encoding = ENCODING, nencoding = NENCODING)
self.assertEqual(pool.homogeneous, 1,
"homogeneous should be 1 by default")
self.assertRaises(cx_Oracle.ProgrammingError, pool.acquire,
user = "proxyuser")
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3,
homogeneous = False)
homogeneous = False, encoding = ENCODING,
nencoding = NENCODING)
self.assertEqual(pool.homogeneous, 0,
"homogeneous should be 0 after setting it in the constructor")
user = "%s_proxy" % USERNAME
@ -63,12 +66,14 @@ class TestConnection(TestCase):
def testRollbackOnDel(self):
"connection rolls back before being destroyed"
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3)
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3,
encoding = ENCODING, nencoding = NENCODING)
connection = pool.acquire()
cursor = connection.cursor()
cursor.execute("truncate table TestTempTable")
cursor.execute("insert into TestTempTable (IntCol) values (1)")
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3)
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3,
encoding = ENCODING, nencoding = NENCODING)
connection = pool.acquire()
cursor = connection.cursor()
cursor.execute("select count(*) from TestTempTable")
@ -77,13 +82,15 @@ class TestConnection(TestCase):
def testRollbackOnRelease(self):
"connection rolls back before released back to the pool"
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3)
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3,
encoding = ENCODING, nencoding = NENCODING)
connection = pool.acquire()
cursor = connection.cursor()
cursor.execute("truncate table TestTempTable")
cursor.execute("insert into TestTempTable (IntCol) values (1)")
pool.release(connection)
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3)
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3,
encoding = ENCODING, nencoding = NENCODING)
connection = pool.acquire()
cursor = connection.cursor()
cursor.execute("select count(*) from TestTempTable")
@ -93,7 +100,7 @@ class TestConnection(TestCase):
def testThreading(self):
"""test session pool to database with multiple threads"""
self.pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 5, 20,
2, threaded = True)
2, threaded = True, encoding = ENCODING, nencoding = NENCODING)
threads = []
for i in range(20):
thread = threading.Thread(None, self.__ConnectAndDrop)
@ -105,7 +112,7 @@ class TestConnection(TestCase):
def testThreadingWithErrors(self):
"""test session pool to database with multiple threads (with errors)"""
self.pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 5, 20,
2, threaded = True)
2, threaded = True, encoding = ENCODING, nencoding = NENCODING)
threads = []
for i in range(20):
thread = threading.Thread(None, self.__ConnectAndGenerateError)

View File

@ -271,3 +271,17 @@ class TestStringVar(BaseTestCase):
self.assertEqual(self.cursor.fetchone(), self.dataByKey[4])
self.assertEqual(self.cursor.fetchone(), None)
def testSupplementalCharacters(self):
"test that binding and fetching supplemental charcters works correctly"
supplementalChars = "𠜎 𠜱 𠝹 𠱓 𠱸 𠲖 𠳏 𠳕 𠴕 𠵼 𠵿 𠸎 𠸏 𠹷 𠺝 " \
"𠺢 𠻗 𠻹 𠻺 𠼭 𠼮 𠽌 𠾴 𠾼 𠿪 𡁜 𡁯 𡁵 𡁶 𡁻 𡃁 𡃉 𡇙 𢃇 " \
"𢞵 𢫕 𢭃 𢯊 𢱑 𢱕 𢳂 𢴈 𢵌 𢵧 𢺳 𣲷 𤓓 𤶸 𤷪 𥄫 𦉘 𦟌 𦧲 " \
"𦧺 𧨾 𨅝 𨈇 𨋢 𨳊 𨳍 𨳒 𩶘"
self.cursor.execute("truncate table TestTempTable")
self.cursor.execute("insert into TestTempTable values (:1, :2)",
(1, supplementalChars))
self.connection.commit()
self.cursor.execute("select StringCol from TestTempTable")
value, = self.cursor.fetchone()
self.assertEqual(value, supplementalChars)

View File

@ -5,22 +5,19 @@ import os
import sys
import unittest
lang = os.environ.get("NLS_LANG")
if lang is None:
os.environ["NLS_LANG"] = ".UTF8"
if sys.version_info[0] < 3:
input = raw_input
def GetValue(name, label):
value = os.environ.get("CX_ORACLE_" + name)
def GetValue(name, label, defaultValue = None):
value = os.environ.get("CX_ORACLE_" + name, defaultValue)
if value is None:
value = input(label + ": ")
if hasattr(cx_Oracle, "UNICODE") or sys.version_info[0] >= 3:
return value
return unicode(value)
return value
USERNAME = GetValue("USERNAME", "user name")
PASSWORD = GetValue("PASSWORD", "password")
TNSENTRY = GetValue("TNSENTRY", "TNS entry")
ENCODING = GetValue("ENCODING", "encoding", "UTF-8")
NENCODING = GetValue("NENCODING", "national encoding", "UTF-8")
ARRAY_SIZE = int(GetValue("ARRAY_SIZE", "array size"))

View File

@ -62,7 +62,8 @@ class BaseTestCase(unittest.TestCase):
import cx_Oracle
import TestEnv
self.connection = cx_Oracle.connect(TestEnv.USERNAME,
TestEnv.PASSWORD, TestEnv.TNSENTRY)
TestEnv.PASSWORD, TestEnv.TNSENTRY,
encoding = TestEnv.ENCODING, nencoding = TestEnv.NENCODING)
self.cursor = self.connection.cursor()
self.cursor.arraysize = TestEnv.ARRAY_SIZE
@ -84,6 +85,8 @@ for name in moduleNames:
setattr(module, "USERNAME", TestEnv.USERNAME)
setattr(module, "PASSWORD", TestEnv.PASSWORD)
setattr(module, "TNSENTRY", TestEnv.TNSENTRY)
setattr(module, "ENCODING", TestEnv.ENCODING)
setattr(module, "NENCODING", TestEnv.NENCODING)
setattr(module, "ARRAY_SIZE", TestEnv.ARRAY_SIZE)
setattr(module, "TestCase", unittest.TestCase)
setattr(module, "BaseTestCase", BaseTestCase)

View File

@ -53,8 +53,8 @@ class TestConnection(TestCase):
def testMakeDSN(self):
"test making a data source name from host, port and sid"
formatString = u"(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=" \
u"(PROTOCOL=TCP)(HOST=%s)(PORT=%d)))(CONNECT_DATA=(SID=%s)))"
formatString = u"(DESCRIPTION=(ADDRESS=" \
u"(PROTOCOL=TCP)(HOST=%s)(PORT=%d))(CONNECT_DATA=(SID=%s)))"
args = (u"hostname", 1521, u"TEST")
result = cx_Oracle.makedsn(*args)
self.assertEqual(result, formatString % args)