Improved samples and test suite.

This commit is contained in:
Anthony Tuininga 2021-07-26 13:31:57 -06:00
parent c665d2efca
commit 440163efe5
50 changed files with 426 additions and 240 deletions

View File

@ -17,6 +17,7 @@ Version 8.3 (TBD)
implicit conversion to integer has become an error in Python 3.10) and
values that are not `int`, `float` or `decimal.Decimal` are explicitly
rejected.
#) Improved samples and test suite.
Version 8.2.1 (June 2021)

View File

@ -10,8 +10,8 @@
#------------------------------------------------------------------------------
# bulk_aq.py
# This script demonstrates how to use bulk enqueuing and dequeuing of
# messages with advanced queuing using cx_Oracle. It makes use of a RAW queue
# created in the sample setup.
# messages with advanced queuing. It makes use of a RAW queue created in the
# sample setup.
#
# This script requires cx_Oracle 8.2 and higher.
#------------------------------------------------------------------------------

View File

@ -4,8 +4,8 @@
#------------------------------------------------------------------------------
# connection_pool.py
# This script demonstrates the use of connection pooling in cx_Oracle. Pools
# can significantly reduce connection times for long running applications that
# This script demonstrates the use of connection pooling. Pools can
# significantly reduce connection times for long running applications that
# repeatedly open and close connections. Internal features help protect against
# dead connections, and also aid use of Oracle Database features such as FAN
# and Application Continuity.
@ -27,8 +27,8 @@ import sample_env
# Create a Connection Pool
pool = oracledb.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2,
max=5, increment=1)
dsn=sample_env.get_connect_string(), min=2, max=5,
increment=1)
def the_long_query():
with pool.acquire() as conn:

View File

@ -12,7 +12,7 @@
# This script demonstrates using continuous query notification in Python, a
# feature that is available in Oracle 11g and later. Once this script is
# running, use another session to insert, update or delete rows from the table
# cx_Oracle.TestTempTable and you will see the notification of that change.
# TestTempTable and you will see the notification of that change.
#
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------

View File

@ -12,7 +12,7 @@
# This script demonstrates using database change notification in Python, a
# feature that is available in Oracle 10g Release 2. Once this script is
# running, use another session to insert, update or delete rows from the table
# cx_Oracle.TestTempTable and you will see the notification of that change.
# TestTempTable and you will see the notification of that change.
#
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------

View File

@ -10,6 +10,7 @@
#------------------------------------------------------------------------------
import collections
import cx_Oracle as oracledb
import sample_env
@ -21,7 +22,7 @@ class Connection(oracledb.Connection):
class Cursor(oracledb.Cursor):
def execute(self, statement, args = None):
def execute(self, statement, args=None):
prepare_needed = (self.statement != statement)
result = super().execute(statement, args or [])
if prepare_needed:

View File

@ -15,8 +15,9 @@
#
#------------------------------------------------------------------------------
import sys
import json
import sys
import cx_Oracle as oracledb
import sample_env

View File

@ -10,8 +10,9 @@
# For JSON with older databases see json_blob.py
#------------------------------------------------------------------------------
import sys
import json
import sys
import cx_Oracle as oracledb
import sample_env

View File

@ -9,8 +9,8 @@
#------------------------------------------------------------------------------
# multi_consumer_aq.py
# This script demonstrates how to use multi-consumer advanced queuing using
# cx_Oracle. It makes use of a RAW queue created in the sample setup.
# This script demonstrates how to use multi-consumer advanced queuing. It
# makes use of a RAW queue created in the sample setup.
#
# This script requires cx_Oracle 8.2 and higher.
#------------------------------------------------------------------------------

View File

@ -9,9 +9,8 @@
#------------------------------------------------------------------------------
# object_aq.py
# This script demonstrates how to use advanced queuing with objects using
# cx_Oracle. It makes use of a simple type and queue created in the sample
# setup.
# This script demonstrates how to use advanced queuing with objects. It makes
# use of a simple type and queue created in the sample setup.
#
# This script requires cx_Oracle 8.2 and higher.
#------------------------------------------------------------------------------

View File

@ -7,8 +7,7 @@
#
# Demonstrate how to bind (in and out) a PL/SQL record.
#
# This feature is new in cx_Oracle 5.3 and is only available in Oracle
# Database 12.1 and higher.
# This feature is only available in Oracle Database 12.1 and higher.
#------------------------------------------------------------------------------
import datetime

View File

@ -9,8 +9,8 @@
#------------------------------------------------------------------------------
# raw_aq.py
# This script demonstrates how to use advanced queuing with RAW data using
# cx_Oracle. It makes use of a RAW queue created in the sample setup.
# This script demonstrates how to use advanced queuing with RAW data. It
# makes use of a RAW queue created in the sample setup.
#
# This script requires cx_Oracle 8.2 and higher.
#------------------------------------------------------------------------------

View File

@ -4,7 +4,7 @@
#------------------------------------------------------------------------------
# ref_cursor.py
# Demonstrates the use of REF cursors with cx_Oracle.
# Demonstrates the use of REF cursors.
#------------------------------------------------------------------------------
import cx_Oracle as oracledb

View File

@ -3,9 +3,9 @@
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# Sets the environment used by most Python cx_Oracle samples. Production
# applications should consider using External Authentication to
# avoid hard coded credentials.
# Sets the environment used by the sample scripts. Production applications
# should consider using External Authentication to avoid hard coded
# credentials.
#
# You can set values in environment variables to bypass the sample requesting
# the information it requires.
@ -24,8 +24,8 @@
# Net Service Name from a tnsnames.ora file or external naming service,
# or it can be the name of a local Oracle database instance.
#
# If cx_Oracle is using Instant Client, then an Easy Connect string is
# generally appropriate. The syntax is:
# If using Instant Client, then an Easy Connect string is generally
# appropriate. The syntax is:
#
# [//]host_name[:port][/service_name][:server_type][/instance_name]
#

View File

@ -29,7 +29,8 @@ import sample_env
pool = oracledb.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2, max=5,
increment=1, session_callback="pkg_SessionCallback.TheCallback")
increment=1,
session_callback="pkg_SessionCallback.TheCallback")
# truncate table logging calls to PL/SQL session callback
with pool.acquire() as conn:

View File

@ -6,7 +6,7 @@
# setup_samples.py
#
# Creates users and populates their schemas with the tables and packages
# necessary for the cx_Oracle samples. An edition is also created for the
# necessary for running the sample scripts. An edition is also created for the
# demonstration of PL/SQL editioning.
#------------------------------------------------------------------------------

View File

@ -5,7 +5,7 @@
/*-----------------------------------------------------------------------------
* setup_samples_exec.sql
* This script performs the actual work of creating and populating the
* schemas with the database objects used by the cx_Oracle samples. An edition
* schemas with the database objects used by the sample scripts. An edition
* is also created for the demonstration of PL/SQL editioning. It is called by
* the setup_samples.sql file after acquiring the necessary parameters and also
* by the Python script setup_samples.py.

View File

@ -5,9 +5,9 @@
#------------------------------------------------------------------------------
# subclassing.py
#
# Demonstrate how to subclass cx_Oracle connections and cursors in order to
# add additional functionality (like logging) or create specialized interfaces
# for paticular applications.
# Demonstrate how to subclass connections and cursors in order to add
# additional functionality (like logging) or create specialized interfaces for
# paticular applications.
#------------------------------------------------------------------------------
import cx_Oracle as oracledb

View File

@ -1,14 +1,14 @@
#------------------------------------------------------------------------------
# Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2019, 2021, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# drop_test.py
#
# Drops the database objects used for the cx_Oracle test suite.
# Drops the database objects used by the test suite.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import test_env
def drop_tests(conn):
@ -18,6 +18,6 @@ def drop_tests(conn):
proxy_user=test_env.get_proxy_user())
if __name__ == "__main__":
conn = cx_Oracle.connect(test_env.get_admin_connect_string())
conn = oracledb.connect(test_env.get_admin_connect_string())
drop_tests(conn)
print("Done.")

View File

@ -6,16 +6,15 @@
# setup_test.py
#
# Creates users and populates their schemas with the tables and packages
# necessary for the cx_Oracle test suite.
# necessary for the test suite.
#------------------------------------------------------------------------------
import cx_Oracle
import test_env
import cx_Oracle as oracledb
import drop_test
import test_env
# connect as administrative user (usually SYSTEM or ADMIN)
conn = cx_Oracle.connect(test_env.get_admin_connect_string())
conn = oracledb.connect(test_env.get_admin_connect_string())
# drop existing users and editions, if applicable
drop_test.drop_tests(conn)

View File

@ -6,12 +6,12 @@
1000 - Module for testing top-level module methods
"""
import test_env
import cx_Oracle as oracledb
import datetime
import time
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
requires_connection = False

View File

@ -11,12 +11,13 @@
1100 - Module for testing connections
"""
import test_env
import cx_Oracle as oracledb
import random
import string
import threading
import time
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
requires_connection = False
@ -29,6 +30,14 @@ class TestCase(test_env.BaseTestCase):
count, = cursor.fetchone()
self.assertEqual(count, 10)
def __verify_fetched_data(self, connection):
expected_data = [f"String {i + 1}" for i in range(10)]
sql = "select StringCol from TestStrings order by IntCol"
for i in range(5):
with connection.cursor() as cursor:
fetched_data = [s for s, in cursor.execute(sql)]
self.assertEqual(fetched_data, expected_data)
def __verify_args(self, connection):
self.assertEqual(connection.username, test_env.get_main_user(),
"user name differs")
@ -68,9 +77,9 @@ class TestCase(test_env.BaseTestCase):
def test_1102_app_context_negative(self):
"1102 - test invalid use of application context"
self.assertRaises(TypeError, oracledb.connect,
test_env.get_main_user(),
test_env.get_main_password(),
test_env.get_connect_string(),
user=test_env.get_main_user(),
password=test_env.get_main_password(),
dsn=test_env.get_connect_string(),
appcontext=[('userenv', 'action')])
def test_1103_attributes(self):
@ -125,9 +134,9 @@ class TestCase(test_env.BaseTestCase):
def test_1106_bad_password(self):
"1106 - connection to database with bad password"
self.assertRaises(oracledb.DatabaseError, oracledb.connect,
test_env.get_main_user(),
test_env.get_main_password() + "X",
test_env.get_connect_string())
user=test_env.get_main_user(),
password=test_env.get_main_password() + "X",
dsn=test_env.get_connect_string())
def test_1107_change_password(self):
"1107 - test changing password"
@ -138,8 +147,9 @@ class TestCase(test_env.BaseTestCase):
new_password = "".join(sys_random.choice(string.ascii_letters) \
for i in range(20))
connection.changepassword(test_env.get_main_password(), new_password)
cconnection = oracledb.connect(test_env.get_main_user(), new_password,
test_env.get_connect_string())
connection = oracledb.connect(dsn=test_env.get_connect_string(),
user=test_env.get_main_user(),
password=new_password)
connection.changepassword(new_password, test_env.get_main_password())
def test_1108_change_password_negative(self):
@ -220,7 +230,7 @@ class TestCase(test_env.BaseTestCase):
cursor.execute("""
insert into TestTempTable (IntCol, StringCol)
values (:val, null)""", val=int_value)
connection2 = oracledb.connect(handle = connection.handle)
connection2 = oracledb.connect(handle=connection.handle)
cursor = connection2.cursor()
cursor.execute("select IntCol from TestTempTable")
fetched_int_value, = cursor.fetchone()
@ -279,7 +289,7 @@ class TestCase(test_env.BaseTestCase):
self.assertEqual(count, 0)
def test_1119_threading(self):
"1119 - connection to database with multiple threads"
"1119 - multiple connections to database with multiple threads"
threads = []
for i in range(20):
thread = threading.Thread(None, self.__connect_and_drop)
@ -465,5 +475,37 @@ class TestCase(test_env.BaseTestCase):
xid = (0x1234, "%032x" % id_, "%032x" % 9)
self.assertRaises(oracledb.DatabaseError, connection.begin, *xid)
def test_1129_threading_single_connection(self):
"1129 - single connection to database with multiple threads"
with test_env.get_connection(threaded=True) as connection:
threads = [threading.Thread(target=self.__verify_fetched_data,
args=(connection,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
def test_1130_cancel(self):
"1130 - test connection cancel"
conn = test_env.get_connection()
sleep_proc_name = "dbms_session.sleep" \
if int(conn.version.split(".")[0]) >= 18 \
else "dbms_lock.sleep"
def perform_cancel():
time.sleep(0.1)
conn.cancel()
thread = threading.Thread(target=perform_cancel)
thread.start()
try:
with conn.cursor() as cursor:
self.assertRaises(oracledb.OperationalError, cursor.callproc,
sleep_proc_name, [2])
finally:
thread.join()
with conn.cursor() as cursor:
cursor.execute("select user from dual")
user, = cursor.fetchone()
self.assertEqual(user, test_env.get_main_user().upper())
if __name__ == "__main__":
test_env.run_test_cases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------
# Copyright (c) 2016, 2021 Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
@ -11,10 +11,10 @@
1200 - Module for testing cursors
"""
import test_env
import cx_Oracle as oracledb
import decimal
import datetime
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
@ -397,7 +397,7 @@ class TestCase(test_env.BaseTestCase):
cursor.fetchmany()
self.assertTrue(cursor.arraysize > 1,
"array size must exceed 1 for this test to work correctly")
cursor.scroll(1, mode = "absolute")
cursor.scroll(1, mode="absolute")
row = cursor.fetchone()
self.assertEqual(row[0], 1.25)
self.assertEqual(cursor.rowcount, 1)
@ -410,7 +410,7 @@ class TestCase(test_env.BaseTestCase):
select NumberCol
from TestNumbers
order by IntCol""")
cursor.scroll(6, mode = "absolute")
cursor.scroll(6, mode="absolute")
row = cursor.fetchone()
self.assertEqual(row[0], 7.5)
self.assertEqual(cursor.rowcount, 6)
@ -515,9 +515,9 @@ class TestCase(test_env.BaseTestCase):
self.cursor.execute("truncate table TestTempTable")
cursor = self.connection.cursor(scrollable=True)
cursor.execute("select * from TestTempTable")
cursor.scroll(mode = "last")
cursor.scroll(mode="last")
self.assertEqual(cursor.fetchall(), [])
cursor.scroll(mode = "first")
cursor.scroll(mode="first")
self.assertEqual(cursor.fetchall(), [])
self.assertRaises(oracledb.DatabaseError, cursor.scroll, 1,
mode="absolute")
@ -637,8 +637,8 @@ class TestCase(test_env.BaseTestCase):
select IntCol, StringCol
from TestTempTable
order by IntCol""")
self.assertEqual(self.cursor.fetchall(),
[(0, "Value should be 0"), (1, "Value should be 1")])
expected_value = [(0, "Value should be 0"), (1, "Value should be 1")]
self.assertEqual(self.cursor.fetchall(), expected_value)
def test_1259_as_context_manager(self):
"1259 - test using a cursor as a context manager"

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------
# Copyright (c) 2016, 2021 Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
@ -11,10 +11,10 @@
1300 - Module for testing cursor variables
"""
import test_env
import sys
import cx_Oracle as oracledb
import sys
import test_env
class TestCase(test_env.BaseTestCase):
@ -27,9 +27,10 @@ class TestCase(test_env.BaseTestCase):
open :cursor for select 'X' StringValue from dual;
end;""",
cursor=cursor)
varchar_ratio, nvarchar_ratio = test_env.get_charset_ratios()
expected_value = [
('STRINGVALUE', oracledb.DB_TYPE_CHAR, 1,
test_env.get_charset_ratio(), None, None, 1)
('STRINGVALUE', oracledb.DB_TYPE_CHAR, 1, varchar_ratio, None,
None, True)
]
self.assertEqual(cursor.description, expected_value)
self.assertEqual(cursor.fetchall(), [('X',)])
@ -39,10 +40,11 @@ class TestCase(test_env.BaseTestCase):
cursor = self.connection.cursor()
self.assertEqual(cursor.description, None)
self.cursor.callproc("pkg_TestRefCursors.TestOutCursor", (2, cursor))
varchar_ratio, nvarchar_ratio = test_env.get_charset_ratios()
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('STRINGCOL', oracledb.DB_TYPE_VARCHAR, 20,
20 * test_env.get_charset_ratio(), None, None, 0)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('STRINGCOL', oracledb.DB_TYPE_VARCHAR, 20, 20 * varchar_ratio,
None, None, False)
]
self.assertEqual(cursor.description, expected_value)
self.assertEqual(cursor.fetchall(), [(1, 'String 1'), (2, 'String 2')])
@ -85,8 +87,9 @@ class TestCase(test_env.BaseTestCase):
from TestNumbers
order by IntCol""")
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('CURSORVALUE', oracledb.DB_TYPE_CURSOR, None, None, None, None, 1)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('CURSORVALUE', oracledb.DB_TYPE_CURSOR, None, None, None, None,
True)
]
self.assertEqual(self.cursor.description, expected_value)
for i in range(1, 11):

View File

@ -11,12 +11,12 @@
1400 - Module for testing date/time variables
"""
import test_env
import cx_Oracle as oracledb
import datetime
import time
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
def setUp(self):
@ -56,9 +56,11 @@ class TestCase(test_env.BaseTestCase):
def test_1402_bind_date_in_datetime_var(self):
"1402 - test binding date in a datetime variable"
var = self.cursor.var(oracledb.DATETIME)
dateVal = datetime.date.today()
var.setvalue(0, dateVal)
self.assertEqual(var.getvalue().date(), dateVal)
date_val = datetime.date.today()
var.setvalue(0, date_val)
self.cursor.execute("select :1 from dual", [var])
result, = self.cursor.fetchone()
self.assertEqual(result.date(), date_val)
def test_1403_bind_date_after_string(self):
"1403 - test binding in a date after setting input sizes to a string"
@ -213,9 +215,9 @@ class TestCase(test_env.BaseTestCase):
"1414 - test cursor description is accurate"
self.cursor.execute("select * from TestDates")
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('DATECOL', oracledb.DB_TYPE_DATE, 23, None, None, None, 0),
('NULLABLECOL', oracledb.DB_TYPE_DATE, 23, None, None, None, 1)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('DATECOL', oracledb.DB_TYPE_DATE, 23, None, None, None, False),
('NULLABLECOL', oracledb.DB_TYPE_DATE, 23, None, None, None, True)
]
self.assertEqual(self.cursor.description, expected_value)

View File

@ -8,10 +8,10 @@ including the synonyms retained for backwards compatibility. This module also
tests for pickling/unpickling of database types and API types.
"""
import test_env
import pickle
import cx_Oracle as oracledb
import pickle
import test_env
class TestCase(test_env.BaseTestCase):
requires_connection = False

View File

@ -6,9 +6,8 @@
1600 - Module for testing DML returning clauses
"""
import test_env
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):

View File

@ -11,10 +11,10 @@
1700 - Module for testing error objects
"""
import test_env
import pickle
import cx_Oracle as oracledb
import pickle
import test_env
class TestCase(test_env.BaseTestCase):

View File

@ -11,10 +11,10 @@
1800 - Module for testing interval variables
"""
import test_env
import datetime
import cx_Oracle as oracledb
import datetime
import test_env
class TestCase(test_env.BaseTestCase):
@ -119,9 +119,11 @@ class TestCase(test_env.BaseTestCase):
"1807 - test cursor description is accurate"
self.cursor.execute("select * from TestIntervals")
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('INTERVALCOL', oracledb.DB_TYPE_INTERVAL_DS, None, None, 2, 6, 0),
('NULLABLECOL', oracledb.DB_TYPE_INTERVAL_DS, None, None, 2, 6, 1)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('INTERVALCOL', oracledb.DB_TYPE_INTERVAL_DS, None, None, 2, 6,
False),
('NULLABLECOL', oracledb.DB_TYPE_INTERVAL_DS, None, None, 2, 6,
True)
]
self.assertEqual(self.cursor.description, expected_value)
@ -151,5 +153,12 @@ class TestCase(test_env.BaseTestCase):
self.assertEqual(self.cursor.fetchone(), self.data_by_key[4])
self.assertEqual(self.cursor.fetchone(), None)
def test_1811_bind_and_fetch_negative_interval(self):
"1811 - test binding and fetching a negative interval"
value = datetime.timedelta(days=-1, seconds=86314, microseconds=431152)
self.cursor.execute("select :1 from dual", [value])
result, = self.cursor.fetchone()
self.assertEqual(result, value)
if __name__ == "__main__":
test_env.run_test_cases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------
# Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
@ -11,9 +11,8 @@
1900 - Module for testing LOB (CLOB and BLOB) variables
"""
import test_env
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
@ -38,7 +37,7 @@ class TestCase(test_env.BaseTestCase):
long_string += char * 25000
elif input_type is not db_type:
continue
self.cursor.setinputsizes(long_string = input_type)
self.cursor.setinputsizes(long_string=input_type)
if lob_type == "BLOB":
bind_value = long_string.encode()
else:
@ -134,16 +133,16 @@ class TestCase(test_env.BaseTestCase):
prev_char = chr(ord('A') + integer_value - 2)
long_string += char * 25000
if lob_type == "BLOB":
actualValue = long_string.encode("ascii")
expected_value = long_string.encode("ascii")
char = char.encode("ascii")
prev_char = prev_char.encode("ascii")
else:
actualValue = long_string
self.assertEqual(lob.size(), len(actualValue))
self.assertEqual(lob.read(), actualValue)
expected_value = long_string
self.assertEqual(lob.size(), len(expected_value))
self.assertEqual(lob.read(), expected_value)
if lob_type == "CLOB":
self.assertEqual(str(lob), actualValue)
self.assertEqual(lob.read(len(actualValue)), char)
self.assertEqual(str(lob), expected_value)
self.assertEqual(lob.read(len(expected_value)), char)
if integer_value > 1:
offset = (integer_value - 1) * 25000 - 4
string = prev_char * 5 + char * 5
@ -183,8 +182,8 @@ class TestCase(test_env.BaseTestCase):
"1905 - test cursor description is accurate for CLOBs"
self.cursor.execute("select * from TestCLOBs")
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('CLOBCOL', oracledb.DB_TYPE_CLOB, None, None, None, None, 0)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('CLOBCOL', oracledb.DB_TYPE_CLOB, None, None, None, None, False)
]
self.assertEqual(self.cursor.description, expected_value)
@ -232,24 +231,20 @@ class TestCase(test_env.BaseTestCase):
"1914 - test binding and fetching NCLOB data (directly)"
self.__perform_test("NCLOB", oracledb.DB_TYPE_NCLOB)
def test_1915_nclob_different_encodings(self):
"1915 - test binding and fetching NCLOB data (different encodings)"
connection = oracledb.connect(test_env.get_main_user(),
test_env.get_main_password(),
test_env.get_connect_string(),
encoding="UTF-8", nencoding="UTF-16")
def test_1915_nclob_non_ascii_chars(self):
"1915 - test binding and fetching NCLOB data (with non-ASCII chars)"
value = "\u03b4\u4e2a"
cursor = connection.cursor()
cursor.execute("truncate table TestNCLOBs")
cursor.setinputsizes(val=oracledb.DB_TYPE_NVARCHAR)
cursor.execute("insert into TestNCLOBs values (1, :val)", val=value)
cursor.execute("select NCLOBCol from TestNCLOBs")
nclob, = cursor.fetchone()
cursor.setinputsizes(val=oracledb.DB_TYPE_NVARCHAR)
cursor.execute("update TestNCLOBs set NCLOBCol = :val",
val=nclob.read() + value)
cursor.execute("select NCLOBCol from TestNCLOBs")
nclob, = cursor.fetchone()
self.cursor.execute("truncate table TestNCLOBs")
self.cursor.setinputsizes(val=oracledb.DB_TYPE_NVARCHAR)
self.cursor.execute("insert into TestNCLOBs values (1, :val)",
val=value)
self.cursor.execute("select NCLOBCol from TestNCLOBs")
nclob, = self.cursor.fetchone()
self.cursor.setinputsizes(val=oracledb.DB_TYPE_NVARCHAR)
self.cursor.execute("update TestNCLOBs set NCLOBCol = :val",
val=nclob.read() + value)
self.cursor.execute("select NCLOBCol from TestNCLOBs")
nclob, = self.cursor.fetchone()
self.assertEqual(nclob.read(), value + value)
def test_1916_nclob_indirect(self):
@ -280,7 +275,7 @@ class TestCase(test_env.BaseTestCase):
temp_lobs = self.__get_temp_lobs(sid)
self.assertEqual(temp_lobs, 0)
def test_1919_AssignStringBeyondArraySize(self):
def test_1919_assign_string_beyond_array_size(self):
"1919 - test assign string to NCLOB beyond array size"
nclobVar = self.cursor.var(oracledb.DB_TYPE_NCLOB)
self.assertRaises(IndexError, nclobVar.setvalue, 1, "test char")

View File

@ -11,9 +11,8 @@
2000 - Module for testing long and long raw variables
"""
import test_env
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
@ -82,18 +81,20 @@ class TestCase(test_env.BaseTestCase):
"2003 - test cursor description is accurate for longs"
self.cursor.execute("select * from TestLongs")
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('LONGCOL', oracledb.DB_TYPE_LONG, None, None, None, None, 0)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('LONGCOL', oracledb.DB_TYPE_LONG, None, None, None, None, False)
]
self.assertEqual(self.cursor.description, expected_value)
def test_2004_long_raw_cursor_description(self):
"2004 - test cursor description is accurate for long raws"
self.cursor.execute("select * from TestLongRaws")
self.assertEqual(self.cursor.description,
[ ('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('LONGRAWCOL', oracledb.DB_TYPE_LONG_RAW, None, None, None,
None, 0) ])
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('LONGRAWCOL', oracledb.DB_TYPE_LONG_RAW, None, None, None, None,
False)
]
self.assertEqual(self.cursor.description, expected_value)
def test_2005_array_size_too_large(self):
"2005 - test array size too large generates an exception"

View File

@ -11,9 +11,8 @@
2100 - Module for testing NCHAR variables
"""
import test_env
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
@ -202,11 +201,15 @@ class TestCase(test_env.BaseTestCase):
def test_2114_cursor_description(self):
"2114 - test cursor description is accurate"
self.cursor.execute("select * from TestUnicodes")
varchar_ratio, nvarchar_ratio = test_env.get_charset_ratios()
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('UNICODECOL', oracledb.DB_TYPE_NVARCHAR, 20, 80, None, None, 0),
('FIXEDUNICODECOL', oracledb.DB_TYPE_NCHAR, 40, 160, None, None, 0),
('NULLABLECOL', oracledb.DB_TYPE_NVARCHAR, 50, 200, None, None, 1)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('UNICODECOL', oracledb.DB_TYPE_NVARCHAR, 20, 20 * nvarchar_ratio,
None, None, False),
('FIXEDUNICODECOL', oracledb.DB_TYPE_NCHAR, 40,
40 * nvarchar_ratio, None, None, False),
('NULLABLECOL', oracledb.DB_TYPE_NVARCHAR, 50, 50 * nvarchar_ratio,
None, None, True)
]
self.assertEqual(self.cursor.description, expected_value)

View File

@ -11,12 +11,12 @@
2200 - Module for testing number variables
"""
import test_env
import cx_Oracle as oracledb
import decimal
import sys
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
def output_type_handler_binary_int(self, cursor, name, default_type, size,
@ -273,13 +273,13 @@ class TestCase(test_env.BaseTestCase):
"2220 - test cursor description is accurate"
self.cursor.execute("select * from TestNumbers")
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('LONGINTCOL', oracledb.DB_TYPE_NUMBER, 17, None, 16, 0, 0),
('NUMBERCOL', oracledb.DB_TYPE_NUMBER, 13, None, 9, 2, 0),
('FLOATCOL', oracledb.DB_TYPE_NUMBER, 127, None, 126, -127, 0),
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('LONGINTCOL', oracledb.DB_TYPE_NUMBER, 17, None, 16, 0, False),
('NUMBERCOL', oracledb.DB_TYPE_NUMBER, 13, None, 9, 2, False),
('FLOATCOL', oracledb.DB_TYPE_NUMBER, 127, None, 126, -127, False),
('UNCONSTRAINEDCOL', oracledb.DB_TYPE_NUMBER, 127, None, 0, -127,
0),
('NULLABLECOL', oracledb.DB_TYPE_NUMBER, 39, None, 38, 0, 1)
False),
('NULLABLECOL', oracledb.DB_TYPE_NUMBER, 39, None, 38, 0, True)
]
self.assertEqual(self.cursor.description, expected_value)

View File

@ -11,12 +11,12 @@
2300 - Module for testing object variables
"""
import test_env
import cx_Oracle as oracledb
import datetime
import decimal
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
def __test_data(self, expected_int_value, expected_obj_value,
@ -103,9 +103,10 @@ class TestCase(test_env.BaseTestCase):
from TestObjects
order by IntCol""")
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('OBJECTCOL', oracledb.DB_TYPE_OBJECT, None, None, None, None, 1),
('ARRAYCOL', oracledb.DB_TYPE_OBJECT, None, None, None, None, 1)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('OBJECTCOL', oracledb.DB_TYPE_OBJECT, None, None, None, None,
True),
('ARRAYCOL', oracledb.DB_TYPE_OBJECT, None, None, None, None, True)
]
self.assertEqual(self.cursor.description, expected_value)
expected_value = (

View File

@ -32,6 +32,44 @@ class TestCase(test_env.BaseTestCase):
self.assertRaises(oracledb.DatabaseError, cursor.execute,
"select 1 / 0 from dual")
def __callable_session_callback(self, conn, requested_tag):
self.session_called = True
supported_formats = {
"SIMPLE" : "'YYYY-MM-DD HH24:MI'",
"FULL" : "'YYYY-MM-DD HH24:MI:SS'"
}
supported_time_zones = {
"UTC" : "'UTC'",
"MST" : "'-07:00'"
}
supported_keys = {
"NLS_DATE_FORMAT" : supported_formats,
"TIME_ZONE" : supported_time_zones
}
if requested_tag is not None:
state_parts = []
for directive in requested_tag.split(";"):
parts = directive.split("=")
if len(parts) != 2:
raise ValueError("Tag must contain key=value pairs")
key, value = parts
value_dict = supported_keys.get(key)
if value_dict is None:
raise ValueError("Tag only supports keys: %s" % \
(", ".join(supported_keys)))
actual_value = value_dict.get(value)
if actual_value is None:
raise ValueError("Key %s only supports values: %s" % \
(key, ", ".join(value_dict)))
state_parts.append("%s = %s" % (key, actual_value))
sql = "alter session set %s" % " ".join(state_parts)
cursor = conn.cursor()
cursor.execute(sql)
conn.tag = requested_tag
def __perform_reconfigure_test(self, parameter_name, parameter_value,
min=3, max=30, increment=4, timeout=5,
wait_timeout=5000, stmtcachesize=25,
@ -357,7 +395,7 @@ class TestCase(test_env.BaseTestCase):
self.assertEqual(pool.opened, 1, "opened (4)")
def test_2414_create_new_pure_connection(self):
"2414 - test to ensure pure connections and being created correctly"
"2414 - test to ensure pure connections are being created correctly"
pool = test_env.get_pool(min=1, max=2, increment=1,
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
connection_1 = pool.acquire()
@ -458,5 +496,63 @@ class TestCase(test_env.BaseTestCase):
getmode=oracledb.SPOOL_ATTRVAL_NOWAIT,
session_callback=callback, sessionCallback=callback)
def test_2419_statement_cache_size(self):
"2419 - test to verify statement cache size is retained"
pool = test_env.get_pool(min=1, max=2, increment=1,
getmode=oracledb.SPOOL_ATTRVAL_WAIT,
stmtcachesize=25)
self.assertEqual(pool.stmtcachesize, 25, "stmtcachesize (25)")
pool.stmtcachesize = 35
self.assertEqual(pool.stmtcachesize, 35, "stmtcachesize (35)")
def test_2420_callable_session_callbacks(self):
"2420 - test that session callbacks are being called correctly"
pool = test_env.get_pool(min=2, max=5, increment=1,
session_callback=self.__callable_session_callback)
# new connection with a tag should invoke the session callback
with pool.acquire(tag="NLS_DATE_FORMAT=SIMPLE") as conn:
cursor = conn.cursor()
cursor.execute("select to_char(2021-05-20) from dual")
result, = cursor.fetchone()
self.assertEqual(self.session_called, True)
# acquiring a connection with the same tag should not invoke the
# session callback
self.session_called = False
with pool.acquire(tag="NLS_DATE_FORMAT=SIMPLE") as conn:
cursor = conn.cursor()
cursor.execute("select to_char(2021-05-20) from dual")
result, = cursor.fetchone()
self.assertEqual(self.session_called, False)
# acquiring a connection with a new tag should invoke the session
# callback
self.session_called = False
with pool.acquire(tag="NLS_DATE_FORMAT=FULL;TIME_ZONE=UTC") as conn:
cursor = conn.cursor()
cursor.execute("select to_char(current_date) from dual")
result, = cursor.fetchone()
self.assertEqual(self.session_called, True)
# acquiring a connection with a new tag and specifying that a
# connection with any tag can be acquired should invoke the session
# callback
self.session_called = False
with pool.acquire(tag="NLS_DATE_FORMAT=FULL;TIME_ZONE=MST", \
matchanytag=True) as conn:
cursor = conn.cursor()
cursor.execute("select to_char(current_date) from dual")
result, = cursor.fetchone()
self.assertEqual(self.session_called, True)
# new session with no tag should invoke the session callback
self.session_called = False
with pool.acquire() as conn:
cursor = conn.cursor()
cursor.execute("select to_char(current_date) from dual")
result, = cursor.fetchone()
self.assertEqual(self.session_called, True)
if __name__ == "__main__":
test_env.run_test_cases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------
# Copyright (c) 2016, 2021 Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
@ -11,12 +11,12 @@
2500 - Module for testing string variables
"""
import test_env
import datetime
import random
import string
import cx_Oracle as oracledb
import datetime
import string
import random
import test_env
class TestCase(test_env.BaseTestCase):
@ -297,15 +297,16 @@ class TestCase(test_env.BaseTestCase):
def test_2522_cursor_description(self):
"2522 - test cursor description is accurate"
self.cursor.execute("select * from TestStrings")
varchar_ratio, nvarchar_ratio = test_env.get_charset_ratios()
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('STRINGCOL', oracledb.DB_TYPE_VARCHAR, 20,
20 * test_env.get_charset_ratio(), None, None, 0),
('RAWCOL', oracledb.DB_TYPE_RAW, 30, 30, None, None, 0),
('FIXEDCHARCOL', oracledb.DB_TYPE_CHAR, 40,
40 * test_env.get_charset_ratio(), None, None, 0),
('NULLABLECOL', oracledb.DB_TYPE_VARCHAR, 50,
50 * test_env.get_charset_ratio(), None, None, 1)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('STRINGCOL', oracledb.DB_TYPE_VARCHAR, 20, 20 * varchar_ratio,
None, None, False),
('RAWCOL', oracledb.DB_TYPE_RAW, 30, 30, None, None, False),
('FIXEDCHARCOL', oracledb.DB_TYPE_CHAR, 40, 40 * varchar_ratio,
None, None, False),
('NULLABLECOL', oracledb.DB_TYPE_VARCHAR, 50, 50 * varchar_ratio,
None, None, True)
]
self.assertEqual(self.cursor.description, expected_value)
@ -426,6 +427,7 @@ class TestCase(test_env.BaseTestCase):
random_string = ''.join(random.choice(chars) for _ in range(1024))
int_val = 200
xml_string = '<data>' + random_string + '</data>'
self.cursor.execute("truncate table TestXML")
self.cursor.execute("""
insert into TestXML (IntCol, XMLCol)
values (:1, :2)""", (int_val, xml_string))
@ -451,11 +453,19 @@ class TestCase(test_env.BaseTestCase):
self.cursor.execute("truncate table TestTempTable")
string_val = "I bought a cafetière on the Champs-Élysées"
sql = "insert into TestTempTable (IntCol, StringCol) values (:1, :2)"
self.cursor.execute(sql, (1, string_val))
self.cursor.outputtypehandler = self.__return_strings_as_bytes
self.cursor.execute("select IntCol, StringCol from TestTempTable")
expected_value = (1, string_val.encode())
self.assertEqual(self.cursor.fetchone(), expected_value)
with self.connection.cursor() as cursor:
cursor.execute(sql, (1, string_val))
cursor.execute("select IntCol, StringCol from TestTempTable")
self.assertEqual(cursor.fetchone(), (1, string_val))
with self.connection.cursor() as cursor:
cursor.outputtypehandler = self.__return_strings_as_bytes
cursor.execute("select IntCol, StringCol from TestTempTable")
expected_value = (1, string_val.encode())
self.assertEqual(cursor.fetchone(), (1, string_val.encode()))
with self.connection.cursor() as cursor:
cursor.outputtypehandler = None
cursor.execute("select IntCol, StringCol from TestTempTable")
self.assertEqual(cursor.fetchone(), (1, string_val))
if __name__ == "__main__":
test_env.run_test_cases()

View File

@ -11,10 +11,10 @@
2600 - Module for testing timestamp variables
"""
import test_env
import time
import cx_Oracle as oracledb
import time
import test_env
class TestCase(test_env.BaseTestCase):
@ -115,9 +115,10 @@ class TestCase(test_env.BaseTestCase):
"2606 - test cursor description is accurate"
self.cursor.execute("select * from TestTimestamps")
expected_value = [
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, 0),
('TIMESTAMPCOL', oracledb.DB_TYPE_TIMESTAMP, 23, None, 0, 6, 0),
('NULLABLECOL', oracledb.DB_TYPE_TIMESTAMP, 23, None, 0, 6, 1)
('INTCOL', oracledb.DB_TYPE_NUMBER, 10, None, 9, 0, False),
('TIMESTAMPCOL', oracledb.DB_TYPE_TIMESTAMP, 23, None, 0, 6,
False),
('NULLABLECOL', oracledb.DB_TYPE_TIMESTAMP, 23, None, 0, 6, True)
]
self.assertEqual(self.cursor.description, expected_value)

View File

@ -6,12 +6,12 @@
2700 - Module for testing AQ
"""
import test_env
import cx_Oracle as oracledb
import decimal
import threading
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
book_type_name = "UDT_BOOK"
book_queue_name = "TEST_BOOK_QUEUE"
@ -23,23 +23,21 @@ class TestCase(test_env.BaseTestCase):
def __clear_books_queue(self):
books_type = self.connection.gettype(self.book_type_name)
book = books_type.newobject()
options = self.connection.deqoptions()
options.wait = oracledb.DEQ_NO_WAIT
options.deliverymode = oracledb.MSG_PERSISTENT_OR_BUFFERED
options.visibility = oracledb.ENQ_IMMEDIATE
props = self.connection.msgproperties()
while self.connection.deq(self.book_queue_name, options, props, book):
queue = self.connection.queue(self.book_queue_name, books_type)
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.deliverymode = oracledb.MSG_PERSISTENT_OR_BUFFERED
queue.deqoptions.visibility = oracledb.DEQ_IMMEDIATE
while queue.deqone():
pass
def __deq_in_thread(self, results):
connection = test_env.get_connection()
connection = test_env.get_connection(threaded=True)
books_type = connection.gettype(self.book_type_name)
book = books_type.newobject()
options = connection.deqoptions()
options.wait = 10
props = connection.msgproperties()
if connection.deq(self.book_queue_name, options, props, book):
queue = connection.queue(self.book_queue_name, books_type)
queue.deqoptions.wait = 10
props = queue.deqone()
if props is not None:
book = props.payload
results.append((book.TITLE, book.AUTHORS, book.PRICE))
connection.commit()
@ -122,8 +120,7 @@ class TestCase(test_env.BaseTestCase):
"2704 - test waiting for dequeue"
self.__clear_books_queue()
results = []
thread = threading.Thread(target = self.__deq_in_thread,
args = (results,))
thread = threading.Thread(target=self.__deq_in_thread, args=(results,))
thread.start()
books_type = self.connection.gettype(self.book_type_name)
book = books_type.newobject()

View File

@ -6,12 +6,12 @@
2800 - Module for testing AQ Bulk enqueue/dequeue
"""
import test_env
import cx_Oracle as oracledb
import decimal
import threading
import cx_Oracle as oracledb
import test_env
RAW_QUEUE_NAME = "TEST_RAW_QUEUE"
RAW_PAYLOAD_DATA = [
"The first message",

View File

@ -6,9 +6,8 @@
2900 - Module for testing Rowids
"""
import test_env
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):

View File

@ -1,15 +1,15 @@
#------------------------------------------------------------------------------
# Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
"""
3000 - Module for testing subscriptions
"""
import test_env
import threading
import cx_Oracle as oracledb
import threading
import test_env
class SubscriptionData(object):

View File

@ -12,9 +12,9 @@
"""
import unittest
import test_env
import cx_Oracle as oracledb
import test_env
@unittest.skipUnless(test_env.get_client_version() >= (12, 1),
"unsupported client")

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------
# Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
@ -11,12 +11,12 @@
3200 - Module for testing features introduced in 12.1
"""
import test_env
import cx_Oracle as oracledb
import datetime
import unittest
import cx_Oracle as oracledb
import test_env
@unittest.skipUnless(test_env.get_client_version() >= (12, 1),
"unsupported client")
class TestCase(test_env.BaseTestCase):
@ -360,7 +360,6 @@ class TestCase(test_env.BaseTestCase):
c1 sys_refcursor;
c2 sys_refcursor;
begin
open c1 for
select NumberCol
from TestNumbers
@ -374,7 +373,6 @@ class TestCase(test_env.BaseTestCase):
where IntCol between 7 and 10;
dbms_sql.return_result(c2);
end;""")
results = self.cursor.getimplicitresults()
self.assertEqual(len(results), 2)
@ -469,5 +467,26 @@ class TestCase(test_env.BaseTestCase):
self.assertEqual(self.cursor.getarraydmlrowcounts(), [1, 2, 0, 0, 1])
self.assertEqual(self.cursor.rowcount, 4)
def test_3225_implicit_results(self):
"3225 - test using implicit cursors to execute new statements"
cursor = self.connection.cursor()
cursor.execute("""
declare
c1 sys_refcursor;
begin
open c1 for
select NumberCol
from TestNumbers
where IntCol between 3 and 5;
dbms_sql.return_result(c1);
end;""")
results = cursor.getimplicitresults()
self.assertEqual(len(results), 1)
self.assertEqual([n for n, in results[0]], [3.75, 5, 6.25])
results[0].execute("select :1 from dual", (7,))
row, = results[0].fetchone()
self.assertEqual(row, 7)
if __name__ == "__main__":
test_env.run_test_cases()

View File

@ -6,12 +6,12 @@
3300 - Module for testing Simple Oracle Document Access (SODA) Database
"""
import test_env
import cx_Oracle as oracledb
import json
import unittest
import cx_Oracle as oracledb
import test_env
@unittest.skipIf(test_env.skip_soda_tests(),
"unsupported client/server combination")
class TestCase(test_env.BaseTestCase):

View File

@ -1,15 +1,15 @@
#------------------------------------------------------------------------------
# Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
"""
3400 - Module for testing Simple Oracle Document Access (SODA) Collections
"""
import test_env
import unittest
import cx_Oracle as oracledb
import unittest
import test_env
@unittest.skipIf(test_env.skip_soda_tests(),
"unsupported client/server combination")
@ -148,7 +148,7 @@ class TestCase(test_env.BaseTestCase):
self.connection.commit()
coll.drop()
def test_3406_CreateAndDropIndex(self):
def test_3406_create_and_drop_index(self):
"3406 - test create and drop Index"
index_name = "cxoTestIndexes_ix_1"
index_spec = {

View File

@ -6,12 +6,13 @@
3500 - Module for testing the JSON data type.
"""
import cx_Oracle as oracledb
import test_env
import datetime
import decimal
import unittest
import cx_Oracle as oracledb
import test_env
@unittest.skipUnless(test_env.get_client_version() >= (21, 0),
"unsupported client")
@unittest.skipUnless(test_env.get_server_version() >= (21, 0),
@ -77,7 +78,7 @@ class TestCase(test_env.BaseTestCase):
self.assertEqual(result, self.json_data)
def test_3501_execute_with_dml_returning(self):
"3502 - inserting single rows with JSON and DML returning"
"3501 - inserting single rows with JSON and DML returning"
json_val = self.json_data[11]
self.cursor.execute("truncate table TestJson")
json_out = self.cursor.var(oracledb.DB_TYPE_JSON)
@ -116,7 +117,7 @@ class TestCase(test_env.BaseTestCase):
self.assertEqual(out_json_var.values, [[v] for v in self.json_data])
def test_3504_boolean(self):
"3509 - test binding boolean values as scalar JSON values"
"3504 - test binding boolean values as scalar JSON values"
data = [
True,
False,
@ -128,7 +129,7 @@ class TestCase(test_env.BaseTestCase):
self.__bind_scalar_as_json(data)
def test_3505_strings_and_bytes(self):
"3509 - test binding strings/bytes values as scalar JSON values"
"3505 - test binding strings/bytes values as scalar JSON values"
data = [
"String 1",
b"A raw value",

View File

@ -6,14 +6,14 @@
3600 - Module for testing the conversions of outputtype handler.
"""
import test_env
import cx_Oracle as oracledb
import datetime
import decimal
import sys
import datetime
import unittest
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
def __test_type_handler(self, input_type, output_type, in_value,

View File

@ -6,13 +6,14 @@
3700 - Module for testing all variable types.
"""
import test_env
import cx_Oracle as oracledb
import decimal
import datetime
import decimal
import time
import unittest
import cx_Oracle as oracledb
import test_env
class TestCase(test_env.BaseTestCase):
def _test_positive_set_and_get(self, var_type, value_to_set,

View File

@ -6,10 +6,10 @@
3800 - Module for testing the input and output type handlers.
"""
import test_env
import json
import cx_Oracle as oracledb
import json
import test_env
class Building(object):

View File

@ -44,13 +44,13 @@
# user for on premises databases is SYSTEM.
#------------------------------------------------------------------------------
import cx_Oracle as oracledb
import getpass
import os
import sys
import unittest
import cx_Oracle as oracledb
# default values
DEFAULT_MAIN_USER = "pythontest"
DEFAULT_PROXY_USER = "pythontestproxy"
@ -86,14 +86,19 @@ def get_admin_connect_string():
"Password for %s" % admin_user)
return "%s/%s@%s" % (admin_user, admin_password, get_connect_string())
def get_charset_ratio():
def get_charset_ratios():
value = PARAMETERS.get("CS_RATIO")
if value is None:
connection = get_connection()
cursor = connection.cursor()
cursor.execute("select 'X' from dual")
column_info, = cursor.description
value = PARAMETERS["CS_RATIO"] = column_info[3]
cursor.execute("""
select
cast('X' as varchar2(1)),
cast('Y' as nvarchar2(1))
from dual""")
varchar_column_info, nvarchar_column_info = cursor.description
value = (varchar_column_info[3], nvarchar_column_info[3])
PARAMETERS["CS_RATIO"] = value
return value
def get_client_version():