Add more tests.

This commit is contained in:
Anthony Tuininga 2021-09-03 10:57:19 -06:00
parent 438c885c20
commit 05a9097847
3 changed files with 125 additions and 4 deletions

View File

@ -15,6 +15,7 @@ import random
import string
import threading
import time
import unittest
import cx_Oracle as oracledb
import test_env
@ -488,9 +489,6 @@ class TestCase(test_env.BaseTestCase):
def test_1130_cancel(self):
"1130 - test connection cancel"
conn = test_env.get_connection()
sleep_proc_name = "dbms_session.sleep" \
if int(conn.version.split(".")[0]) >= 18 \
else "dbms_lock.sleep"
def perform_cancel():
time.sleep(0.1)
conn.cancel()
@ -499,7 +497,7 @@ class TestCase(test_env.BaseTestCase):
try:
with conn.cursor() as cursor:
self.assertRaises(oracledb.OperationalError, cursor.callproc,
sleep_proc_name, [2])
test_env.get_sleep_proc_name(), [2])
finally:
thread.join()
with conn.cursor() as cursor:
@ -507,5 +505,89 @@ class TestCase(test_env.BaseTestCase):
user, = cursor.fetchone()
self.assertEqual(user, test_env.get_main_user().upper())
def test_1131_change_password_during_connect(self):
"1131 - test changing password during connect"
connection = test_env.get_connection()
if self.is_on_oracle_cloud(connection):
self.skipTest("passwords on Oracle Cloud are strictly controlled")
sys_random = random.SystemRandom()
new_password = "".join(sys_random.choice(string.ascii_letters) \
for i in range(20))
connection = oracledb.connect(dsn=test_env.get_connect_string(),
user=test_env.get_main_user(),
password=test_env.get_main_password(),
newpassword=new_password)
connection = oracledb.connect(dsn=test_env.get_connect_string(),
user=test_env.get_main_user(),
password=new_password)
connection.changepassword(new_password, test_env.get_main_password())
def test_1132_autocommit_during_reexecute(self):
"1132 - test use of autocommit during reexecute"
sql = "insert into TestTempTable (IntCol, StringCol) values (:1, :2)"
data_to_insert = [
(1, "Test String #1"),
(2, "Test String #2")
]
connection = test_env.get_connection()
cursor = connection.cursor()
other_connection = test_env.get_connection()
other_cursor = other_connection.cursor()
cursor.execute("truncate table TestTempTable")
cursor.execute(sql, data_to_insert[0])
other_cursor.execute("select IntCol, StringCol from TestTempTable")
rows = other_cursor.fetchall()
self.assertEqual(rows, [])
connection.autocommit = True
cursor.execute(sql, data_to_insert[1])
other_cursor.execute("select IntCol, StringCol from TestTempTable")
rows = other_cursor.fetchall()
self.assertEqual(rows, data_to_insert)
def test_1133_current_schema(self):
"1133 - test current_schame is set properly"
conn = test_env.get_connection()
self.assertEqual(conn.current_schema, None)
user = test_env.get_main_user().upper()
proxy_user = test_env.get_proxy_user().upper()
cursor = conn.cursor()
cursor.execute(f'alter session set current_schema={proxy_user}')
self.assertEqual(conn.current_schema, proxy_user)
conn.current_schema = user
self.assertEqual(conn.current_schema, user)
cursor.execute("""
select sys_context('userenv', 'current_schema')
from dual""")
result, = cursor.fetchone()
self.assertEqual(result, user)
def test_1134_dbms_output(self):
"1134 - test dbms_output package"
conn = test_env.get_connection()
cursor = conn.cursor()
test_string = "Testing DBMS_OUTPUT package"
cursor.callproc("dbms_output.enable")
cursor.execute("""
begin
dbms_output.put_line(:val);
end; """, val=test_string)
string_var = cursor.var(str)
number_var = cursor.var(int)
cursor.callproc("dbms_output.get_line", (string_var, number_var))
self.assertEqual(string_var.getvalue(), test_string)
@unittest.skipIf(test_env.get_client_version() < (18, 1),
"unsupported client")
def test_1135_calltimeout(self):
"1135 - test connection call_timeout"
conn = test_env.get_connection()
conn.call_timeout = 500 # milliseconds
self.assertEqual(conn.call_timeout, 500)
self.assertRaises(oracledb.DatabaseError, conn.cursor().callproc,
test_env.get_sleep_proc_name(), [2])
if __name__ == "__main__":
test_env.run_test_cases()

View File

@ -301,5 +301,39 @@ class TestCase(test_env.BaseTestCase):
self.assertEqual(coll.find().count(), 0)
coll.drop()
def test_3414_soda_hint(self):
"3414 - verify hints are reflected in the executed SQL statement"
soda_db = self.connection.getSodaDatabase()
cursor = self.connection.cursor()
statement = """
SELECT
( SELECT t2.sql_fulltext
FROM v$sql t2
WHERE t2.sql_id = t1.prev_sql_id
AND t2.child_number = t1.prev_child_number
)
FROM v$session t1
WHERE t1.audsid = sys_context('userenv', 'sessionid')"""
coll = soda_db.createCollection("cxoSodaHint")
coll.find().remove()
values_to_insert = [
{"name": "George", "age": 47},
{"name": "Susan", "age": 39},
]
coll.insertOneAndGet(values_to_insert[0], hint="MONITOR")
cursor.execute(statement)
result, = cursor.fetchone()
self.assertTrue('MONITOR' in result.read())
coll.find().hint("MONITOR").getOne().getContent()
cursor.execute(statement)
result, = cursor.fetchone()
self.assertTrue('MONITOR' in result.read())
coll.insertOneAndGet(values_to_insert[1], hint="NO_MONITOR")
cursor.execute(statement)
result, = cursor.fetchone()
self.assertTrue('NO_MONITOR' in result.read())
if __name__ == "__main__":
test_env.run_test_cases()

View File

@ -137,6 +137,11 @@ def get_proxy_password():
def get_proxy_user():
return get_value("PROXY_USER", "Proxy User Name", DEFAULT_PROXY_USER)
def get_sleep_proc_name():
server_version = get_server_version()
return "dbms_session.sleep" if server_version[0] >= 18 \
else "dbms_lock.sleep"
def get_server_version():
name = "SERVER_VERSION"
value = PARAMETERS.get(name)