diff --git a/test/test_1100_connection.py b/test/test_1100_connection.py index e700e3b..2c036a1 100644 --- a/test/test_1100_connection.py +++ b/test/test_1100_connection.py @@ -15,6 +15,7 @@ import random import string import threading import time +import unittest import cx_Oracle as oracledb import test_env @@ -488,9 +489,6 @@ class TestCase(test_env.BaseTestCase): def test_1130_cancel(self): "1130 - test connection cancel" conn = test_env.get_connection() - sleep_proc_name = "dbms_session.sleep" \ - if int(conn.version.split(".")[0]) >= 18 \ - else "dbms_lock.sleep" def perform_cancel(): time.sleep(0.1) conn.cancel() @@ -499,7 +497,7 @@ class TestCase(test_env.BaseTestCase): try: with conn.cursor() as cursor: self.assertRaises(oracledb.OperationalError, cursor.callproc, - sleep_proc_name, [2]) + test_env.get_sleep_proc_name(), [2]) finally: thread.join() with conn.cursor() as cursor: @@ -507,5 +505,89 @@ class TestCase(test_env.BaseTestCase): user, = cursor.fetchone() self.assertEqual(user, test_env.get_main_user().upper()) + def test_1131_change_password_during_connect(self): + "1131 - test changing password during connect" + connection = test_env.get_connection() + if self.is_on_oracle_cloud(connection): + self.skipTest("passwords on Oracle Cloud are strictly controlled") + sys_random = random.SystemRandom() + new_password = "".join(sys_random.choice(string.ascii_letters) \ + for i in range(20)) + connection = oracledb.connect(dsn=test_env.get_connect_string(), + user=test_env.get_main_user(), + password=test_env.get_main_password(), + newpassword=new_password) + connection = oracledb.connect(dsn=test_env.get_connect_string(), + user=test_env.get_main_user(), + password=new_password) + connection.changepassword(new_password, test_env.get_main_password()) + + def test_1132_autocommit_during_reexecute(self): + "1132 - test use of autocommit during reexecute" + sql = "insert into TestTempTable (IntCol, StringCol) values (:1, :2)" + data_to_insert = [ + (1, "Test String #1"), + (2, "Test String #2") + ] + connection = test_env.get_connection() + cursor = connection.cursor() + other_connection = test_env.get_connection() + other_cursor = other_connection.cursor() + cursor.execute("truncate table TestTempTable") + cursor.execute(sql, data_to_insert[0]) + other_cursor.execute("select IntCol, StringCol from TestTempTable") + rows = other_cursor.fetchall() + self.assertEqual(rows, []) + connection.autocommit = True + cursor.execute(sql, data_to_insert[1]) + other_cursor.execute("select IntCol, StringCol from TestTempTable") + rows = other_cursor.fetchall() + self.assertEqual(rows, data_to_insert) + + def test_1133_current_schema(self): + "1133 - test current_schame is set properly" + conn = test_env.get_connection() + self.assertEqual(conn.current_schema, None) + + user = test_env.get_main_user().upper() + proxy_user = test_env.get_proxy_user().upper() + cursor = conn.cursor() + cursor.execute(f'alter session set current_schema={proxy_user}') + self.assertEqual(conn.current_schema, proxy_user) + + conn.current_schema = user + self.assertEqual(conn.current_schema, user) + + cursor.execute(""" + select sys_context('userenv', 'current_schema') + from dual""") + result, = cursor.fetchone() + self.assertEqual(result, user) + + def test_1134_dbms_output(self): + "1134 - test dbms_output package" + conn = test_env.get_connection() + cursor = conn.cursor() + test_string = "Testing DBMS_OUTPUT package" + cursor.callproc("dbms_output.enable") + cursor.execute(""" + begin + dbms_output.put_line(:val); + end; """, val=test_string) + string_var = cursor.var(str) + number_var = cursor.var(int) + cursor.callproc("dbms_output.get_line", (string_var, number_var)) + self.assertEqual(string_var.getvalue(), test_string) + + @unittest.skipIf(test_env.get_client_version() < (18, 1), + "unsupported client") + def test_1135_calltimeout(self): + "1135 - test connection call_timeout" + conn = test_env.get_connection() + conn.call_timeout = 500 # milliseconds + self.assertEqual(conn.call_timeout, 500) + self.assertRaises(oracledb.DatabaseError, conn.cursor().callproc, + test_env.get_sleep_proc_name(), [2]) + if __name__ == "__main__": test_env.run_test_cases() diff --git a/test/test_3400_soda_collection.py b/test/test_3400_soda_collection.py index 2b4361d..66c504c 100644 --- a/test/test_3400_soda_collection.py +++ b/test/test_3400_soda_collection.py @@ -301,5 +301,39 @@ class TestCase(test_env.BaseTestCase): self.assertEqual(coll.find().count(), 0) coll.drop() + def test_3414_soda_hint(self): + "3414 - verify hints are reflected in the executed SQL statement" + soda_db = self.connection.getSodaDatabase() + cursor = self.connection.cursor() + statement = """ + SELECT + ( SELECT t2.sql_fulltext + FROM v$sql t2 + WHERE t2.sql_id = t1.prev_sql_id + AND t2.child_number = t1.prev_child_number + ) + FROM v$session t1 + WHERE t1.audsid = sys_context('userenv', 'sessionid')""" + coll = soda_db.createCollection("cxoSodaHint") + coll.find().remove() + values_to_insert = [ + {"name": "George", "age": 47}, + {"name": "Susan", "age": 39}, + ] + coll.insertOneAndGet(values_to_insert[0], hint="MONITOR") + cursor.execute(statement) + result, = cursor.fetchone() + self.assertTrue('MONITOR' in result.read()) + + coll.find().hint("MONITOR").getOne().getContent() + cursor.execute(statement) + result, = cursor.fetchone() + self.assertTrue('MONITOR' in result.read()) + + coll.insertOneAndGet(values_to_insert[1], hint="NO_MONITOR") + cursor.execute(statement) + result, = cursor.fetchone() + self.assertTrue('NO_MONITOR' in result.read()) + if __name__ == "__main__": test_env.run_test_cases() diff --git a/test/test_env.py b/test/test_env.py index ec59d20..dbabf89 100644 --- a/test/test_env.py +++ b/test/test_env.py @@ -137,6 +137,11 @@ def get_proxy_password(): def get_proxy_user(): return get_value("PROXY_USER", "Proxy User Name", DEFAULT_PROXY_USER) +def get_sleep_proc_name(): + server_version = get_server_version() + return "dbms_session.sleep" if server_version[0] >= 18 \ + else "dbms_lock.sleep" + def get_server_version(): name = "SERVER_VERSION" value = PARAMETERS.get(name)