Migrate samples to use PEP 8 naming style consistently.

This commit is contained in:
Anthony Tuininga 2021-04-23 13:50:06 -06:00
parent cccfa322c7
commit fa8d6fa5be
50 changed files with 237 additions and 226 deletions

View File

@ -16,7 +16,7 @@
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
# define constants used throughout the script; adjust as desired
@ -27,8 +27,8 @@ APP_CTX_ENTRIES = [
( APP_CTX_NAMESPACE, "ATTR3", "VALUE3" )
]
connection = cx_Oracle.connect(sample_env.get_main_connect_string(),
appcontext=APP_CTX_ENTRIES)
connection = oracledb.connect(sample_env.get_main_connect_string(),
appcontext=APP_CTX_ENTRIES)
cursor = connection.cursor()
for namespace, name, value in APP_CTX_ENTRIES:
cursor.execute("select sys_context(:1, :2) from dual", (namespace, name))

View File

@ -12,27 +12,27 @@
# This script requires cx_Oracle 6.4 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import threading
import time
import cx_Oracle as oracledb
import sample_env
registered = True
def ProcessMessages(message):
def process_messages(message):
global registered
print("Message type:", message.type)
if message.type == cx_Oracle.EVENT_DEREG:
if message.type == oracledb.EVENT_DEREG:
print("Deregistration has taken place...")
registered = False
return
print("Queue name:", message.queueName)
print("Consumer name:", message.consumerName)
connection = cx_Oracle.connect(sample_env.get_main_connect_string(),
events=True)
sub = connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_AQ,
name="DEMO_BOOK_QUEUE", callback=ProcessMessages,
connection = oracledb.connect(sample_env.get_main_connect_string(),
events=True)
sub = connection.subscribe(namespace=oracledb.SUBSCR_NAMESPACE_AQ,
name="DEMO_BOOK_QUEUE", callback=process_messages,
timeout=300)
print("Subscription:", sub)
print("--> Connection:", sub.connection)

View File

@ -13,10 +13,10 @@
# This script requires cx_Oracle 5.2 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# show the number of rows for each parent ID as a means of verifying the

View File

@ -15,10 +15,10 @@
# This script requires cx_Oracle 5.2 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# define data to insert
@ -47,7 +47,7 @@ print("number of rows to insert:", len(data_to_insert))
try:
cursor.executemany("insert into ChildTable values (:1, :2, :3)",
data_to_insert)
except cx_Oracle.DatabaseError as e:
except oracledb.DatabaseError as e:
error, = e.args
print("FAILED with error:", error.message)
print("number of rows which succeeded:", cursor.rowcount)

View File

@ -8,10 +8,10 @@
# Demonstrate how to insert a row into a table using bind variables.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
#------------------------------------------------------------------------------
# "Bind by position"

View File

@ -12,10 +12,10 @@
# special characters or SQL injection attacks.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
sql = 'select * from SampleQueryTab where id = :bvid'

View File

@ -16,7 +16,7 @@
# This script requires cx_Oracle 7.2 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
QUEUE_NAME = "DEMO_RAW_QUEUE"
@ -36,13 +36,13 @@ PAYLOAD_DATA = [
]
# connect to database
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# create queue
queue = connection.queue(QUEUE_NAME)
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
queue.deqOptions.wait = oracledb.DEQ_NO_WAIT
queue.deqOptions.navigation = oracledb.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent

View File

@ -13,10 +13,10 @@
# higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection.call_timeout = 2000
print("Call timeout set at", connection.call_timeout, "milliseconds...")
@ -33,7 +33,7 @@ sleep_proc_name = "dbms_session.sleep" \
print("Sleeping...should time out...")
try:
cursor.callproc(sleep_proc_name, (3,))
except cx_Oracle.DatabaseError as e:
except oracledb.DatabaseError as e:
print("ERROR:", e)
cursor.execute("select sysdate from dual")

View File

@ -20,27 +20,22 @@
#
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import threading
import cx_Oracle as oracledb
import sample_env
# Create a Connection Pool
pool = cx_Oracle.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2,
max=5, increment=1, threaded=True)
pool = oracledb.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2,
max=5, increment=1, threaded=True)
# dbms_session.sleep() replaces dbms_lock.sleep() from Oracle Database 18c
with pool.acquire() as conn:
sleep_proc_name = "dbms_session.sleep" \
if int(conn.version.split(".")[0]) >= 18 \
else "dbms_lock.sleep"
def TheLongQuery():
def the_long_query():
with pool.acquire() as conn:
cursor = conn.cursor()
cursor.arraysize = 25000
print("TheLongQuery(): beginning execute...")
print("the_long_query(): beginning execute...")
cursor.execute("""
select *
from
@ -50,27 +45,32 @@ def TheLongQuery():
cross join TestNumbers
cross join TestNumbers
cross join TestNumbers""")
print("TheLongQuery(): done execute...")
print("the_long_query(): done execute...")
while True:
rows = cursor.fetchmany()
if not rows:
break
print("TheLongQuery(): fetched", len(rows), "rows...")
print("TheLongQuery(): all done!")
print("the_long_query(): fetched", len(rows), "rows...")
print("the_long_query(): all done!")
def DoALock():
def do_a_lock():
with pool.acquire() as conn:
# dbms_session.sleep() replaces dbms_lock.sleep()
# from Oracle Database 18c
sleep_proc_name = "dbms_session.sleep" \
if int(conn.version.split(".")[0]) >= 18 \
else "dbms_lock.sleep"
cursor = conn.cursor()
print("DoALock(): beginning execute...")
print("do_a_lock(): beginning execute...")
cursor.callproc(sleep_proc_name, (5,))
print("DoALock(): done execute...")
print("do_a_lock(): done execute...")
thread1 = threading.Thread(None, TheLongQuery)
thread1 = threading.Thread(target=the_long_query)
thread1.start()
thread2 = threading.Thread(None, DoALock)
thread2 = threading.Thread(target=do_a_lock)
thread2.start()
thread1.join()

View File

@ -17,10 +17,11 @@
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import time
import cx_Oracle as oracledb
import sample_env
registered = True
def callback(message):
@ -47,10 +48,10 @@ def callback(message):
print("-" * 60)
print("=" * 60)
connection = cx_Oracle.connect(sample_env.get_main_connect_string(),
events=True)
sub = connection.subscribe(callback = callback, timeout = 1800,
qos = cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS)
connection = oracledb.connect(sample_env.get_main_connect_string(),
events=True)
qos = oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS
sub = connection.subscribe(callback=callback, timeout=1800, qos=qos)
print("Subscription:", sub)
print("--> Connection:", sub.connection)
print("--> Callback:", sub.callback)
@ -58,7 +59,7 @@ print("--> Namespace:", sub.namespace)
print("--> Protocol:", sub.protocol)
print("--> Timeout:", sub.timeout)
print("--> Operations:", sub.operations)
print("--> Rowids?:", bool(sub.qos & cx_Oracle.SUBSCR_QOS_ROWIDS))
print("--> Rowids?:", bool(sub.qos & oracledb.SUBSCR_QOS_ROWIDS))
queryId = sub.registerquery("select * from TestTempTable")
print("Registered query:", queryId)

View File

@ -7,7 +7,7 @@
# This script demonstrates using continuous query notification in Python, a
# feature that is available in Oracle 11g and later. Once this script is
# running, use another session to insert, update or delete rows from the table
# cx_Oracle.TestTempTable and you will see the notification of that change.
# TestTempTable and you will see the notification of that change.
#
# This script differs from cqn.py in that it shows how a connection can be
# acquired from a session pool and used to query the changes that have been
@ -16,10 +16,11 @@
# This script requires cx_Oracle 7 or higher.
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import time
import cx_Oracle as oracledb
import sample_env
registered = True
def callback(message):
@ -37,13 +38,13 @@ def callback(message):
num_rows_deleted = 0
print(len(table.rows), "row changes detected in table", table.name)
for row in table.rows:
if row.operation & cx_Oracle.OPCODE_DELETE:
if row.operation & oracledb.OPCODE_DELETE:
num_rows_deleted += 1
continue
ops = []
if row.operation & cx_Oracle.OPCODE_INSERT:
if row.operation & oracledb.OPCODE_INSERT:
ops.append("inserted")
if row.operation & cx_Oracle.OPCODE_UPDATE:
if row.operation & oracledb.OPCODE_UPDATE:
ops.append("updated")
cursor = connection.cursor()
cursor.execute("""
@ -57,13 +58,13 @@ def callback(message):
print(" ", num_rows_deleted, "rows deleted")
print("=" * 60)
pool = cx_Oracle.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2,
max=5, increment=1, events=True, threaded=True)
pool = oracledb.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2, max=5,
increment=1, events=True, threaded=True)
with pool.acquire() as connection:
sub = connection.subscribe(callback=callback, timeout=1800,
qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS)
qos = oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS
sub = connection.subscribe(callback=callback, timeout=1800, qos=qos)
print("Subscription created with ID:", sub.id)
query_id = sub.registerquery("select * from TestTempTable")
print("Registered query with ID:", query_id)

View File

@ -17,10 +17,11 @@
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import time
import cx_Oracle as oracledb
import sample_env
registered = True
def callback(message):
@ -44,10 +45,10 @@ def callback(message):
print("-" * 60)
print("=" * 60)
connection = cx_Oracle.connect(sample_env.get_main_connect_string(),
events=True)
sub = connection.subscribe(callback = callback, timeout = 1800,
qos = cx_Oracle.SUBSCR_QOS_ROWIDS)
connection = oracledb.connect(sample_env.get_main_connect_string(),
events=True)
sub = connection.subscribe(callback=callback, timeout=1800,
qos=oracledb.SUBSCR_QOS_ROWIDS)
print("Subscription:", sub)
print("--> Connection:", sub.connection)
print("--> ID:", sub.id)
@ -56,7 +57,7 @@ print("--> Namespace:", sub.namespace)
print("--> Protocol:", sub.protocol)
print("--> Timeout:", sub.timeout)
print("--> Operations:", sub.operations)
print("--> Rowids?:", bool(sub.qos & cx_Oracle.SUBSCR_QOS_ROWIDS))
print("--> Rowids?:", bool(sub.qos & oracledb.SUBSCR_QOS_ROWIDS))
sub.registerquery("select * from TestTempTable")
while registered:

View File

@ -16,14 +16,14 @@
# This script requires cx_Oracle 4.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
# need to connect as SYSDBA or SYSOPER
connection = cx_Oracle.connect(mode=cx_Oracle.SYSDBA)
connection = oracledb.connect(mode=oracledb.SYSDBA)
# first shutdown() call must specify the mode, if DBSHUTDOWN_ABORT is used,
# there is no need for any of the other steps
connection.shutdown(mode=cx_Oracle.DBSHUTDOWN_IMMEDIATE)
connection.shutdown(mode=oracledb.DBSHUTDOWN_IMMEDIATE)
# now close and dismount the database
cursor = connection.cursor()
@ -31,4 +31,4 @@ cursor.execute("alter database close normal")
cursor.execute("alter database dismount")
# perform the final shutdown call
connection.shutdown(mode=cx_Oracle.DBSHUTDOWN_FINAL)
connection.shutdown(mode=oracledb.DBSHUTDOWN_FINAL)

View File

@ -16,14 +16,14 @@
# This script requires cx_Oracle 4.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
# the connection must be in PRELIM_AUTH mode
connection = cx_Oracle.connect(mode=cx_Oracle.SYSDBA | cx_Oracle.PRELIM_AUTH)
connection = oracledb.connect(mode=oracledb.SYSDBA | oracledb.PRELIM_AUTH)
connection.startup()
# the following statements must be issued in normal SYSDBA mode
connection = cx_Oracle.connect("/", mode=cx_Oracle.SYSDBA)
connection = oracledb.connect("/", mode=oracledb.SYSDBA)
cursor = connection.cursor()
cursor.execute("alter database mount")
cursor.execute("alter database open")

View File

@ -8,10 +8,10 @@
# the DBMS_OUTPUT package.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# enable DBMS_OUTPUT

View File

@ -15,12 +15,11 @@
# This script requires cx_Oracle 6.0 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import datetime
import cx_Oracle as oracledb
import sample_env
# truncate table first so that script can be rerun
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
print("Truncating table...")
cursor.execute("truncate table TestTempTable")

View File

@ -32,11 +32,11 @@
#
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
conn = cx_Oracle.connect(sample_env.get_drcp_connect_string(),
cclass="PYCLASS", purity=cx_Oracle.ATTR_PURITY_SELF)
conn = oracledb.connect(sample_env.get_drcp_connect_string(),
cclass="PYCLASS", purity=oracledb.ATTR_PURITY_SELF)
cursor = conn.cursor()
print("Performing query using DRCP...")
for row in cursor.execute("select * from TestNumbers order by IntCol"):

View File

@ -8,7 +8,7 @@
# Drops the database objects used for the cx_Oracle samples.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
def drop_samples(conn):
@ -19,6 +19,6 @@ def drop_samples(conn):
edition_name=sample_env.get_edition_name())
if __name__ == "__main__":
conn = cx_Oracle.connect(sample_env.get_admin_connect_string())
conn = oracledb.connect(sample_env.get_admin_connect_string())
drop_samples(conn)
print("Done.")

View File

@ -17,14 +17,15 @@
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import os
import cx_Oracle as oracledb
import sample_env
# connect to the editions user and create a procedure
edition_connect_string = sample_env.get_edition_connect_string()
edition_name = sample_env.get_edition_name()
connection = cx_Oracle.connect(edition_connect_string)
connection = oracledb.connect(edition_connect_string)
print("Edition should be None, actual value is:", repr(connection.edition))
cursor = connection.cursor()
cursor.execute("""
@ -57,8 +58,8 @@ print("Function should return 'Base Procedure', actually returns:",
repr(result))
# the edition can be set upon connection
connection = cx_Oracle.connect(edition_connect_string,
edition=edition_name.upper())
connection = oracledb.connect(edition_connect_string,
edition=edition_name.upper())
cursor = connection.cursor()
result = cursor.callfunc("TestEditions", str)
print("Function should return 'Edition 1 Procedure', actually returns:",
@ -66,7 +67,7 @@ print("Function should return 'Edition 1 Procedure', actually returns:",
# it can also be set via the environment variable ORA_EDITION
os.environ["ORA_EDITION"] = edition_name.upper()
connection = cx_Oracle.connect(edition_connect_string)
connection = oracledb.connect(edition_connect_string)
print("Edition should be", repr(edition_name.upper()),
"actual value is:", repr(connection.edition))
cursor = connection.cursor()

View File

@ -10,23 +10,23 @@
#------------------------------------------------------------------------------
import collections
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
class Connection(cx_Oracle.Connection):
class Connection(oracledb.Connection):
def cursor(self):
return Cursor(self)
class Cursor(cx_Oracle.Cursor):
class Cursor(oracledb.Cursor):
def execute(self, statement, args = None):
prepare_needed = (self.statement != statement)
result = super(Cursor, self).execute(statement, args or [])
result = super().execute(statement, args or [])
if prepare_needed:
description = self.description
if description:
if description is not None:
names = [d[0] for d in description]
self.rowfactory = collections.namedtuple("GenericQuery", names)
return result

View File

@ -16,10 +16,10 @@
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# use PL/SQL block to return two cursors

View File

@ -15,11 +15,11 @@
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
# create and populate Oracle objects
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
type_obj = connection.gettype("MDSYS.SDO_GEOMETRY")
element_info_type_obj = connection.gettype("MDSYS.SDO_ELEM_INFO_ARRAY")
ordinate_type_obj = connection.gettype("MDSYS.SDO_ORDINATE_ARRAY")

View File

@ -17,12 +17,12 @@
import sys
import json
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
client_version = cx_Oracle.clientversion()[0]
client_version = oracledb.clientversion()[0]
db_version = int(connection.version.split(".")[0])
# Minimum database vesion is 12
@ -52,7 +52,7 @@ data = dict(name="Rod", dept="Sales", location="Germany")
inssql = "insert into customers values (:1, :2)"
if client_version >= 21 and db_version >= 21:
# Take advantage of direct binding
cursor.setinputsizes(None, cx_Oracle.DB_TYPE_JSON)
cursor.setinputsizes(None, oracledb.DB_TYPE_JSON)
cursor.execute(inssql, [1, data])
else:
# Insert the data as a JSON string

View File

@ -12,12 +12,12 @@
import sys
import json
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
client_version = cx_Oracle.clientversion()[0]
client_version = oracledb.clientversion()[0]
db_version = int(connection.version.split(".")[0])
# this script only works with Oracle Database 21
@ -49,7 +49,7 @@ data = dict(name="Rod", dept="Sales", location="Germany")
inssql = "insert into customers values (:1, :2)"
if client_version >= 21:
# Take advantage of direct binding
cursor.setinputsizes(None, cx_Oracle.DB_TYPE_JSON)
cursor.setinputsizes(None, oracledb.DB_TYPE_JSON)
cursor.execute(inssql, [1, data])
else:
# Insert the data as a JSON string

View File

@ -9,10 +9,10 @@
# This script requires cx_Oracle 7.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
row1 = [1, "First"]
row2 = [2, "Second"]

View File

@ -15,7 +15,7 @@
# This script requires cx_Oracle 8.2 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
QUEUE_NAME = "DEMO_RAW_QUEUE_MULTI"
@ -27,13 +27,13 @@ PAYLOAD_DATA = [
]
# connect to database
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# create queue
queue = connection.queue(QUEUE_NAME)
queue.deqoptions.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqoptions.navigation = cx_Oracle.DEQ_FIRST_MSG
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# enqueue a few messages
print("Enqueuing messages...")

View File

@ -16,10 +16,11 @@
# This script requires cx_Oracle 8.2 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import decimal
import cx_Oracle as oracledb
import sample_env
BOOK_TYPE_NAME = "UDT_BOOK"
QUEUE_NAME = "DEMO_BOOK_QUEUE"
BOOK_DATA = [
@ -30,14 +31,14 @@ BOOK_DATA = [
]
# connect to database
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# create queue
books_type = connection.gettype(BOOK_TYPE_NAME)
queue = connection.queue(QUEUE_NAME, payload_type=books_type)
queue.deqoptions.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqoptions.navigation = cx_Oracle.DEQ_FIRST_MSG
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent

View File

@ -13,10 +13,10 @@
# is new in cx_Oracle 7.0.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
# create new empty object of the correct type
# note the use of a PL/SQL type defined in a package

View File

@ -8,10 +8,10 @@
# Demonstrate how to call a PL/SQL function and get its return value.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
res = cursor.callfunc('myfunc', int, ('abc', 2))

View File

@ -9,10 +9,10 @@
# OUT variable.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
myvar = cursor.var(int)

View File

@ -11,11 +11,12 @@
# Database 12.1 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import datetime
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
import cx_Oracle as oracledb
import sample_env
connection = oracledb.connect(sample_env.get_main_connect_string())
# create new object of the correct type
# note the use of a PL/SQL record defined in a package

View File

@ -8,10 +8,10 @@
# Demonstrate how to perform a query in different ways.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
sql = """
select * from SampleQueryTab

View File

@ -11,10 +11,11 @@
#------------------------------------------------------------------------------
import time
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
start = time.time()

View File

@ -15,7 +15,7 @@
# This script requires cx_Oracle 8.2 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
QUEUE_NAME = "DEMO_RAW_QUEUE"
@ -27,13 +27,13 @@ PAYLOAD_DATA = [
]
# connect to database
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# create queue
queue = connection.queue(QUEUE_NAME)
queue.deqoptions.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqoptions.navigation = cx_Oracle.DEQ_FIRST_MSG
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent

View File

@ -7,10 +7,10 @@
# Demonstrates the use of REF cursors with cx_Oracle.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
ref_cursor = connection.cursor()

View File

@ -18,16 +18,16 @@
# This script requires cx_Oracle 5.0 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
def output_type_handler(cursor, name, default_type, size, precision, scale):
if default_type == cx_Oracle.CLOB:
return cursor.var(cx_Oracle.LONG_STRING, arraysize=cursor.arraysize)
if default_type == cx_Oracle.BLOB:
return cursor.var(cx_Oracle.LONG_BINARY, arraysize=cursor.arraysize)
if default_type == oracledb.CLOB:
return cursor.var(oracledb.LONG_STRING, arraysize=cursor.arraysize)
if default_type == oracledb.BLOB:
return cursor.var(oracledb.LONG_BINARY, arraysize=cursor.arraysize)
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection.outputtypehandler = output_type_handler
cursor = connection.cursor()
@ -40,11 +40,11 @@ for i in range(10):
char = chr(ord('A') + i)
long_string += char * 25000
# uncomment the line below for cx_Oracle 5.3 and earlier
# cursor.setinputsizes(None, cx_Oracle.LONG_STRING)
# cursor.setinputsizes(None, oracledb.LONG_STRING)
cursor.execute("insert into TestClobs values (:1, :2)",
(i + 1, "STRING " + long_string))
# uncomment the line below for cx_Oracle 5.3 and earlier
# cursor.setinputsizes(None, cx_Oracle.LONG_BINARY)
# cursor.setinputsizes(None, oracledb.LONG_BINARY)
cursor.execute("insert into TestBlobs values (:1, :2)",
(i + 1, long_string.encode("ascii")))
connection.commit()

View File

@ -14,15 +14,16 @@
# This script requires cx_Oracle 5.0 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import decimal
import cx_Oracle as oracledb
import sample_env
def output_type_handler(cursor, name, defaultType, size, precision, scale):
if defaultType == cx_Oracle.NUMBER:
return cursor.var(decimal.Decimal, arraysize = cursor.arraysize)
def output_type_handler(cursor, name, default_type, size, precision, scale):
if default_type == oracledb.NUMBER:
return cursor.var(decimal.Decimal, arraysize=cursor.arraysize)
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection.outputtypehandler = output_type_handler
cursor = connection.cursor()
cursor.execute("select * from TestNumbers")

View File

@ -16,17 +16,17 @@
# This script requires cx_Oracle 4.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
class Test(object):
class Test:
def __init__(self, a, b, c):
self.a = a
self.b = b
self.c = c
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# change this to False if you want to create the table yourself using SQL*Plus

View File

@ -16,10 +16,10 @@
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
# show all of the rows available in the table
cursor = connection.cursor()

View File

@ -20,7 +20,7 @@
#
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
# define a dictionary of NLS_DATE_FORMAT formats supported by this sample
@ -52,22 +52,22 @@ def init_session(conn, requested_tag):
# in this example, they are used to set NLS parameters and the tag is
# parsed to validate it
if requested_tag is not None:
stateParts = []
state_parts = []
for directive in requested_tag.split(";"):
parts = directive.split("=")
if len(parts) != 2:
raise ValueError("Tag must contain key=value pairs")
key, value = parts
valueDict = SUPPORTED_KEYS.get(key)
if valueDict is None:
value_dict = SUPPORTED_KEYS.get(key)
if value_dict is None:
raise ValueError("Tag only supports keys: %s" % \
(", ".join(SUPPORTED_KEYS)))
actualValue = valueDict.get(value)
if actualValue is None:
actual_value = value_dict.get(value)
if actual_value is None:
raise ValueError("Key %s only supports values: %s" % \
(key, ", ".join(valueDict)))
stateParts.append("%s = %s" % (key, actualValue))
sql = "alter session set %s" % " ".join(stateParts)
(key, ", ".join(value_dict)))
state_parts.append("%s = %s" % (key, actual_value))
sql = "alter session set %s" % " ".join(state_parts)
cursor = conn.cursor()
cursor.execute(sql)
@ -78,11 +78,11 @@ def init_session(conn, requested_tag):
# create pool with session callback defined
pool = cx_Oracle.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2, max=5,
increment=1, threaded=True,
session_callback=init_session)
pool = oracledb.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2, max=5,
increment=1, threaded=True,
session_callback=init_session)
# acquire session without specifying a tag; since the session returned is
# newly created, the callback will be invoked but since there is no tag

View File

@ -22,15 +22,15 @@
#
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
# create pool with session callback defined
pool = cx_Oracle.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2, max=5,
increment=1, threaded=True,
session_callback="pkg_SessionCallback.TheCallback")
pool = oracledb.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2, max=5,
increment=1, threaded=True,
session_callback="pkg_SessionCallback.TheCallback")
# truncate table logging calls to PL/SQL session callback
with pool.acquire() as conn:

View File

@ -10,13 +10,13 @@
# demonstration of PL/SQL editioning.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
import drop_samples
# connect as administrative user (usually SYSTEM or ADMIN)
conn = cx_Oracle.connect(sample_env.get_admin_connect_string())
conn = oracledb.connect(sample_env.get_admin_connect_string())
# drop existing users and editions, if applicable
drop_samples.drop_samples(conn)

View File

@ -15,13 +15,13 @@
# sharding capabilities.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
pool = cx_Oracle.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=1, max=5,
increment=1)
pool = oracledb.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=1, max=5,
increment=1)
def connect_and_display(sharding_key):
print("Connecting with sharding key:", sharding_key)

View File

@ -12,10 +12,10 @@
# The user must have been granted the SODA_APP privilege.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
# The general recommendation for simple SODA usage is to enable autocommit
connection.autocommit = True

View File

@ -12,10 +12,10 @@
# The user must have been granted the SODA_APP privilege.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
# the general recommendation for simple SODA usage is to enable autocommit
connection.autocommit = True

View File

@ -22,13 +22,14 @@
# dependencies (see http://geopandas.org/install.html).
#------------------------------------------------------------------------------
import sample_env
import cx_Oracle
from shapely.wkb import loads
import geopandas as gpd
import cx_Oracle as oracledb
import sample_env
# create Oracle connection and cursor objects
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# enable autocommit to avoid the additional round trip to the database to
@ -39,8 +40,8 @@ connection.autocommit = True
# define output type handler to fetch LOBs, avoiding the second round trip to
# the database to read the LOB contents
def output_type_handler(cursor, name, default_type, size, precision, scale):
if default_type == cx_Oracle.BLOB:
return cursor.var(cx_Oracle.LONG_BINARY, arraysize=cursor.arraysize)
if default_type == oracledb.BLOB:
return cursor.var(oracledb.LONG_BINARY, arraysize=cursor.arraysize)
connection.outputtypehandler = output_type_handler
# drop and create table

View File

@ -10,18 +10,18 @@
# for paticular applications.
#------------------------------------------------------------------------------
import cx_Oracle
import cx_Oracle as oracledb
import sample_env
# sample subclassed connection which overrides the constructor (so no
# parameters are required) and the cursor() method (so that the subclassed
# cursor is returned instead of the default cursor implementation)
class Connection(cx_Oracle.Connection):
class Connection(oracledb.Connection):
def __init__(self):
connect_string = sample_env.get_main_connect_string()
print("CONNECT to database")
return super(Connection, self).__init__(connect_string)
super().__init__(connect_string)
def cursor(self):
return Cursor(self)
@ -29,18 +29,18 @@ class Connection(cx_Oracle.Connection):
# sample subclassed cursor which overrides the execute() and fetchone()
# methods in order to perform some simple logging
class Cursor(cx_Oracle.Cursor):
class Cursor(oracledb.Cursor):
def execute(self, statement, args):
print("EXECUTE", statement)
print("ARGS:")
for arg_index, arg in enumerate(args):
print(" ", arg_index + 1, "=>", repr(arg))
return super(Cursor, self).execute(statement, args)
return super().execute(statement, args)
def fetchone(self):
print("FETCH ONE")
return super(Cursor, self).fetchone()
return super().fetchone()
# create instances of the subclassed connection and cursor

View File

@ -33,17 +33,18 @@
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import sample_env
import sys
import cx_Oracle as oracledb
import sample_env
# constants
CONNECT_STRING = "localhost/orcl-tg"
# create transaction and generate a recoverable error
pool = cx_Oracle.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=CONNECT_STRING, min=1, max=9, increment=2)
pool = oracledb.SessionPool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=CONNECT_STRING, min=1, max=9, increment=2)
connection = pool.acquire()
cursor = connection.cursor()
cursor.execute("""
@ -57,7 +58,7 @@ input("Please kill %s session now. Press ENTER when complete." % \
try:
connection.commit() # this should fail
sys.exit("Session was not killed. Terminating.")
except cx_Oracle.DatabaseError as e:
except oracledb.DatabaseError as e:
errorObj, = e.args
if not errorObj.isrecoverable:
sys.exit("Session is not recoverable. Terminating.")
@ -69,7 +70,7 @@ pool.drop(connection)
# check if previous transaction completed
connection = pool.acquire()
cursor = connection.cursor()
args = (cx_Oracle.Binary(ltxid), cursor.var(bool), cursor.var(bool))
args = (oracledb.Binary(ltxid), cursor.var(bool), cursor.var(bool))
_, committed, completed = cursor.callproc("dbms_app_cont.get_ltxid_outcome",
args)
print("Failed transaction was committed:", committed)

View File

@ -17,20 +17,21 @@
# This script requires cx_Oracle 5.0 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import datetime
import cx_Oracle as oracledb
import sample_env
con = cx_Oracle.connect(sample_env.get_main_connect_string())
con = oracledb.connect(sample_env.get_main_connect_string())
obj_type = con.gettype("UDT_BUILDING")
class Building(object):
class Building:
def __init__(self, building_id, description, num_floors, dateBuilt):
def __init__(self, building_id, description, num_floors, date_built):
self.building_id = building_id
self.description = description
self.num_floors = num_floors
self.dateBuilt = dateBuilt
self.date_built = date_built
def __repr__(self):
return "<Building %s: %s>" % (self.building_id, self.description)
@ -41,7 +42,7 @@ def building_in_converter(value):
obj.BUILDINGID = value.building_id
obj.DESCRIPTION = value.description
obj.NUMFLOORS = value.num_floors
obj.DATEBUILT = value.dateBuilt
obj.DATEBUILT = value.date_built
return obj
@ -50,14 +51,14 @@ def building_out_converter(obj):
obj.DATEBUILT)
def input_type_handler(cursor, value, numElements):
def input_type_handler(cursor, value, num_elements):
if isinstance(value, Building):
return cursor.var(cx_Oracle.OBJECT, arraysize=numElements,
return cursor.var(oracledb.OBJECT, arraysize=num_elements,
inconverter=building_in_converter, typename=obj_type.name)
def output_type_handler(cursor, name, default_type, size, precision, scale):
if default_type == cx_Oracle.OBJECT:
return cursor.var(cx_Oracle.OBJECT, arraysize=cursor.arraysize,
if default_type == oracledb.OBJECT:
return cursor.var(oracledb.OBJECT, arraysize=cursor.arraysize,
outconverter=building_out_converter,
typename=obj_type.name)
@ -73,7 +74,7 @@ for building in buildings:
try:
cur.execute("insert into TestBuildings values (:1, :2)",
(building.building_id, building))
except cx_Oracle.DatabaseError as e:
except oracledb.DatabaseError as e:
error, = e.args
print("CONTEXT:", error.context)
print("MESSAGE:", error.message)

View File

@ -15,8 +15,9 @@
# This script requires cx_Oracle 6.0 and higher.
#------------------------------------------------------------------------------
import cx_Oracle
import datetime
import cx_Oracle as oracledb
import sample_env
DATA = [
@ -26,7 +27,7 @@ DATA = [
]
# truncate table so sample can be rerun
connection = cx_Oracle.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
print("Truncating table...")
cursor.execute("truncate table TestUniversalRowids")