diff --git a/adodbapi/adodbapi.py b/adodbapi/adodbapi.py index 7d38754db1..456cd4275b 100644 --- a/adodbapi/adodbapi.py +++ b/adodbapi/adodbapi.py @@ -61,12 +61,6 @@ def getIndexedValue(obj, index): from collections.abc import Mapping -# --- define objects to smooth out Python3000 <-> Python 2 differences -unicodeType = str -longType = int -StringTypes = str -maxint = sys.maxsize - # ----------------- The .connect method ----------------- def make_COM_connecter(): @@ -167,7 +161,7 @@ def _configure_parameter(p, value, adotype, settings_known): p.Size = len(value) p.AppendChunk(value) - elif isinstance(value, StringTypes): # v2.1 Jevon + elif isinstance(value, str): # v2.1 Jevon L = len(value) if adotype in api.adoStringTypes: # v2.2.1 Cole if settings_known: diff --git a/adodbapi/apibase.py b/adodbapi/apibase.py index e663631e58..78f972ad2b 100644 --- a/adodbapi/apibase.py +++ b/adodbapi/apibase.py @@ -16,19 +16,6 @@ verbose = False # debugging flag -# --- define objects to smooth out Python3 <-> Python 2 differences -unicodeType = str -longType = int -StringTypes = str -makeByteBuffer = bytes -memoryViewType = memoryview -_BaseException = Exception - -try: # jdhardy -- handle bytes under Py3 - bytes -except NameError: - bytes = str # define it for old Pythons - # ------- Error handlers ------ def standardErrorHandler(connection, cursor, errorclass, errorvalue): @@ -45,8 +32,7 @@ def standardErrorHandler(connection, cursor, errorclass, errorvalue): raise errorclass(errorvalue) -# Note: _BaseException is defined differently between Python 2 and 3 -class Error(_BaseException): +class Error(Exception): pass # Exception that is the base class of all other error # exceptions. You can use this to catch all errors with one # single 'except' statement. Warnings are not considered @@ -55,7 +41,7 @@ class Error(_BaseException): # module exceptions). -class Warning(_BaseException): +class Warning(Exception): pass @@ -153,7 +139,7 @@ class FetchFailedError(OperationalError): # # def Binary(aString): # """This function constructs an object capable of holding a binary (long) string value. """ -# b = makeByteBuffer(aString) +# b = bytes(aString) # return b # ----- Time converters ---------------------------------------------- class TimeConverter(object): # this is a generic time converter skeleton @@ -413,7 +399,7 @@ def __ne__(self, other): # ------- utilities for translating python data types to ADO data types --------------------------------- typeMap = { - memoryViewType: adc.adVarBinary, + memoryview: adc.adVarBinary, float: adc.adDouble, type(None): adc.adEmpty, str: adc.adBSTR, @@ -435,7 +421,7 @@ def pyTypeToADOType(d): if tp in dateconverter.types: return adc.adDate # otherwise, attempt to discern the type by probing the data object itself -- to handle duck typing - if isinstance(d, StringTypes): + if isinstance(d, str): return adc.adBSTR if isinstance(d, numbers.Integral): return adc.adBigInt diff --git a/adodbapi/remote.py b/adodbapi/remote.py index bf6ab13a54..a4f6bf6a46 100644 --- a/adodbapi/remote.py +++ b/adodbapi/remote.py @@ -44,8 +44,6 @@ import adodbapi.process_connect_string from adodbapi.apibase import ProgrammingError -_BaseException = api._BaseException - sys.excepthook = Pyro4.util.excepthook Pyro4.config.PREFER_IP_VERSION = 0 # allow system to prefer IPv6 Pyro4.config.COMMTIMEOUT = 40.0 # a bit longer than the default SQL server Gtimeout @@ -58,16 +56,12 @@ if verbose: print(version) -# --- define objects to smooth out Python3 <-> Python 2 differences -unicodeType = str # this line will be altered by 2to3.py to '= str' -longType = int # this line will be altered by 2to3.py to '= int' -StringTypes = str -makeByteBuffer = bytes -memoryViewType = memoryview # ----------------------------------------------------------- # conversion functions mandated by PEP 249 -Binary = makeByteBuffer # override the function from apibase.py +def Binary(aString): + """This function constructs an object capable of holding a binary (long) string value.""" + return bytes(aString) def Date(year, month, day): @@ -164,7 +158,7 @@ def connect(*args, **kwargs): # --> a remote db-api connection object raise api.DatabaseError( "Pyro error creating connection to/thru=%s" % repr(kwargs) ) - except _BaseException as e: + except Exception as e: raise api.DatabaseError( "Error creating remote connection to=%s, e=%s, %s" % (repr(kwargs), repr(e), sys.exc_info()[2]) @@ -364,7 +358,7 @@ def fixpickle(x): # for 'named' paramstyle user will pass a mapping newargs = {} for arg, val in list(x.items()): - if isinstance(val, memoryViewType): + if isinstance(val, memoryview): newval = array.array("B") newval.fromstring(val) newargs[arg] = newval @@ -374,7 +368,7 @@ def fixpickle(x): # if not a mapping, then a sequence newargs = [] for arg in x: - if isinstance(arg, memoryViewType): + if isinstance(arg, memoryview): newarg = array.array("B") newarg.fromstring(arg) newargs.append(newarg) @@ -565,7 +559,7 @@ def callproc(self, procname, parameters=None): def fetchone(self): try: f1 = self.proxy.crsr_fetchone(self.id) - except _BaseException as e: + except Exception as e: self._raiseCursorError(api.DatabaseError, e) else: if f1 is None: diff --git a/adodbapi/remote/server.py b/adodbapi/remote/server.py index 9ac532d629..98cf60d80b 100644 --- a/adodbapi/remote/server.py +++ b/adodbapi/remote/server.py @@ -47,9 +47,6 @@ import adodbapi.apibase as api import adodbapi.process_connect_string -makeByteBuffer = bytes -_BaseException = Exception -Binary = bytes try: pyro_host = os.environ["PYRO_HOST"] except: @@ -63,25 +60,25 @@ if arg.lower().startswith("host"): try: pyro_host = arg.split("=")[1] - except _BaseException: + except Exception: raise TypeError('Must supply value for argument="%s"' % arg) if arg.lower().startswith("port"): try: pyro_port = int(arg.split("=")[1]) - except _BaseException: + except Exception: raise TypeError('Must supply numeric value for argument="%s"' % arg) if arg.lower().startswith("timeout"): try: PYRO_COMMTIMEOUT = int(arg.split("=")[1]) - except _BaseException: + except Exception: raise TypeError('Must supply numeric value for argument="%s"' % arg) if arg.lower().startswith("--verbose"): try: verbose = int(arg.split("=")[1]) - except _BaseException: + except Exception: raise TypeError('Must supply numeric value for argument="%s"' % arg) adodbapi.adodbapi.verbose = verbose else: @@ -118,7 +115,7 @@ def unfixpickle(x): newargs = {} for arg, val in list(x.items()): if isinstance(arg, array.array): - newargs[arg] = Binary(val) + newargs[arg] = bytes(val) else: newargs[arg] = val return newargs @@ -126,7 +123,7 @@ def unfixpickle(x): newargs = [] for arg in x: if isinstance(arg, array.array): - newargs.append(Binary(arg)) + newargs.append(bytes(arg)) else: newargs.append(arg) return newargs diff --git a/adodbapi/test/adodbapitest.py b/adodbapi/test/adodbapitest.py index d049e173ea..f890541a43 100644 --- a/adodbapi/test/adodbapitest.py +++ b/adodbapi/test/adodbapitest.py @@ -46,9 +46,6 @@ from adodbapi import ado_consts -long = int - - def randomstring(length): return "".join([random.choice(string.ascii_letters) for n in range(32)]) diff --git a/adodbapi/test/dbapi20.py b/adodbapi/test/dbapi20.py index 2cd8335776..deeba209ab 100644 --- a/adodbapi/test/dbapi20.py +++ b/adodbapi/test/dbapi20.py @@ -18,19 +18,6 @@ import time import unittest -if sys.version[0] >= "3": # Python 3 - _BaseException = Exception - - def _failUnless(self, expr, msg=None): - self.assertTrue(expr, msg) - -else: # Python 2 - from exceptions import Exception as _BaseException - - def _failUnless(self, expr, msg=None): - self.failUnless(expr, msg) ## deprecated since Python 2.6 - - # set this to "True" to follow API 2.0 to the letter TEST_FOR_NON_IDEMPOTENT_CLOSE = False @@ -166,7 +153,7 @@ def tearDown(self): pass finally: con.close() - except _BaseException: + except Exception: pass def _connect(self): @@ -194,7 +181,7 @@ def test_threadsafety(self): # Must exist threadsafety = self.driver.threadsafety # Must be a valid value - _failUnless(self, threadsafety in (0, 1, 2, 3)) + self.assertTrue(threadsafety in (0, 1, 2, 3)) except AttributeError: self.fail("Driver doesn't define threadsafety") @@ -203,8 +190,8 @@ def test_paramstyle(self): # Must exist paramstyle = self.driver.paramstyle # Must be a valid value - _failUnless( - self, paramstyle in ("qmark", "numeric", "named", "format", "pyformat") + self.assertTrue( + paramstyle in ("qmark", "numeric", "named", "format", "pyformat") ) except AttributeError: self.fail("Driver doesn't define paramstyle") @@ -219,13 +206,13 @@ def test_Exceptions(self): self.failUnless(issubclass(self.driver.Warning, Exception)) self.failUnless(issubclass(self.driver.Error, Exception)) - _failUnless(self, issubclass(self.driver.InterfaceError, self.driver.Error)) - _failUnless(self, issubclass(self.driver.DatabaseError, self.driver.Error)) - _failUnless(self, issubclass(self.driver.OperationalError, self.driver.Error)) - _failUnless(self, issubclass(self.driver.IntegrityError, self.driver.Error)) - _failUnless(self, issubclass(self.driver.InternalError, self.driver.Error)) - _failUnless(self, issubclass(self.driver.ProgrammingError, self.driver.Error)) - _failUnless(self, issubclass(self.driver.NotSupportedError, self.driver.Error)) + self.assertTrue(issubclass(self.driver.InterfaceError, self.driver.Error)) + self.assertTrue(issubclass(self.driver.DatabaseError, self.driver.Error)) + self.assertTrue(issubclass(self.driver.OperationalError, self.driver.Error)) + self.assertTrue(issubclass(self.driver.IntegrityError, self.driver.Error)) + self.assertTrue(issubclass(self.driver.InternalError, self.driver.Error)) + self.assertTrue(issubclass(self.driver.ProgrammingError, self.driver.Error)) + self.assertTrue(issubclass(self.driver.NotSupportedError, self.driver.Error)) def test_ExceptionsAsConnectionAttributes(self): # OPTIONAL EXTENSION @@ -236,15 +223,15 @@ def test_ExceptionsAsConnectionAttributes(self): # by default. con = self._connect() drv = self.driver - _failUnless(self, con.Warning is drv.Warning) - _failUnless(self, con.Error is drv.Error) - _failUnless(self, con.InterfaceError is drv.InterfaceError) - _failUnless(self, con.DatabaseError is drv.DatabaseError) - _failUnless(self, con.OperationalError is drv.OperationalError) - _failUnless(self, con.IntegrityError is drv.IntegrityError) - _failUnless(self, con.InternalError is drv.InternalError) - _failUnless(self, con.ProgrammingError is drv.ProgrammingError) - _failUnless(self, con.NotSupportedError is drv.NotSupportedError) + self.assertTrue(con.Warning is drv.Warning) + self.assertTrue(con.Error is drv.Error) + self.assertTrue(con.InterfaceError is drv.InterfaceError) + self.assertTrue(con.DatabaseError is drv.DatabaseError) + self.assertTrue(con.OperationalError is drv.OperationalError) + self.assertTrue(con.IntegrityError is drv.IntegrityError) + self.assertTrue(con.InternalError is drv.InternalError) + self.assertTrue(con.ProgrammingError is drv.ProgrammingError) + self.assertTrue(con.NotSupportedError is drv.NotSupportedError) def test_commit(self): con = self._connect() @@ -338,8 +325,7 @@ def test_rowcount(self): try: cur = con.cursor() self.executeDDL1(cur) - _failUnless( - self, + self.assertTrue( cur.rowcount in (-1, 0), # Bug #543885 "cursor.rowcount should be -1 or 0 after executing no-result " "statements", @@ -347,15 +333,13 @@ def test_rowcount(self): cur.execute( "insert into %sbooze values ('Victoria Bitter')" % (self.table_prefix) ) - _failUnless( - self, + self.assertTrue( cur.rowcount in (-1, 1), "cursor.rowcount should == number or rows inserted, or " "set to -1 after executing an insert statement", ) cur.execute("select name from %sbooze" % self.table_prefix) - _failUnless( - self, + self.assertTrue( cur.rowcount in (-1, 1), "cursor.rowcount should == number of rows returned, or " "set to -1 after executing a select statement", @@ -425,7 +409,7 @@ def _paraminsert(self, cur): "insert into %sbarflys values ('Victoria Bitter', 'thi%%s :may ca%%(u)se? troub:1e')" % (self.table_prefix) ) - _failUnless(self, cur.rowcount in (-1, 1)) + self.assertTrue(cur.rowcount in (-1, 1)) if self.driver.paramstyle == "qmark": cur.execute( @@ -459,7 +443,7 @@ def _paraminsert(self, cur): ) else: self.fail("Invalid paramstyle") - _failUnless(self, cur.rowcount in (-1, 1)) + self.assertTrue(cur.rowcount in (-1, 1)) cur.execute("select name, drink from %sbarflys" % self.table_prefix) res = cur.fetchall() @@ -520,8 +504,7 @@ def test_executemany(self): ) else: self.fail("Unknown paramstyle") - _failUnless( - self, + self.assertTrue( cur.rowcount in (-1, 2), "insert using cursor.executemany set cursor.rowcount to " "incorrect value %r" % cur.rowcount, @@ -560,7 +543,7 @@ def test_fetchone(self): None, "cursor.fetchone should return None if a query retrieves " "no rows", ) - _failUnless(self, cur.rowcount in (-1, 0)) + self.assertTrue(cur.rowcount in (-1, 0)) # cursor.fetchone should raise an Error if called after # executing a query that cannnot return rows @@ -582,7 +565,7 @@ def test_fetchone(self): None, "cursor.fetchone should return None if no more rows available", ) - _failUnless(self, cur.rowcount in (-1, 1)) + self.assertTrue(cur.rowcount in (-1, 1)) finally: con.close() @@ -642,7 +625,7 @@ def test_fetchmany(self): "cursor.fetchmany should return an empty sequence after " "results are exhausted", ) - _failUnless(self, cur.rowcount in (-1, 6)) + self.assertTrue(cur.rowcount in (-1, 6)) # Same as above, using cursor.arraysize cur.arraysize = 4 @@ -655,12 +638,12 @@ def test_fetchmany(self): self.assertEqual(len(r), 2) r = cur.fetchmany() # Should be an empty sequence self.assertEqual(len(r), 0) - _failUnless(self, cur.rowcount in (-1, 6)) + self.assertTrue(cur.rowcount in (-1, 6)) cur.arraysize = 6 cur.execute("select name from %sbooze" % self.table_prefix) rows = cur.fetchmany() # Should get all rows - _failUnless(self, cur.rowcount in (-1, 6)) + self.assertTrue(cur.rowcount in (-1, 6)) self.assertEqual(len(rows), 6) self.assertEqual(len(rows), 6) rows = [r[0] for r in rows] @@ -681,7 +664,7 @@ def test_fetchmany(self): "cursor.fetchmany should return an empty sequence if " "called after the whole result set has been fetched", ) - _failUnless(self, cur.rowcount in (-1, 6)) + self.assertTrue(cur.rowcount in (-1, 6)) self.executeDDL2(cur) cur.execute("select name from %sbarflys" % self.table_prefix) @@ -692,7 +675,7 @@ def test_fetchmany(self): "cursor.fetchmany should return an empty sequence if " "query retrieved no rows", ) - _failUnless(self, cur.rowcount in (-1, 0)) + self.assertTrue(cur.rowcount in (-1, 0)) finally: con.close() @@ -716,7 +699,7 @@ def test_fetchall(self): cur.execute("select name from %sbooze" % self.table_prefix) rows = cur.fetchall() - _failUnless(self, cur.rowcount in (-1, len(self.samples))) + self.assertTrue(cur.rowcount in (-1, len(self.samples))) self.assertEqual( len(rows), len(self.samples), @@ -735,12 +718,12 @@ def test_fetchall(self): "cursor.fetchall should return an empty list if called " "after the whole result set has been fetched", ) - _failUnless(self, cur.rowcount in (-1, len(self.samples))) + self.assertTrue(cur.rowcount in (-1, len(self.samples))) self.executeDDL2(cur) cur.execute("select name from %sbarflys" % self.table_prefix) rows = cur.fetchall() - _failUnless(self, cur.rowcount in (-1, 0)) + self.assertTrue(cur.rowcount in (-1, 0)) self.assertEqual( len(rows), 0, @@ -764,7 +747,7 @@ def test_mixedfetch(self): rows23 = cur.fetchmany(2) rows4 = cur.fetchone() rows56 = cur.fetchall() - _failUnless(self, cur.rowcount in (-1, 6)) + self.assertTrue(cur.rowcount in (-1, 6)) self.assertEqual( len(rows23), 2, "fetchmany returned incorrect number of rows" ) @@ -812,8 +795,8 @@ def test_arraysize(self): con = self._connect() try: cur = con.cursor() - _failUnless( - self, hasattr(cur, "arraysize"), "cursor.arraysize must be defined" + self.assertTrue( + hasattr(cur, "arraysize"), "cursor.arraysize must be defined" ) finally: con.close() @@ -881,26 +864,22 @@ def test_Binary(self): b = self.driver.Binary(b"") def test_STRING(self): - _failUnless( - self, hasattr(self.driver, "STRING"), "module.STRING must be defined" - ) + self.assertTrue(hasattr(self.driver, "STRING"), "module.STRING must be defined") def test_BINARY(self): - _failUnless( - self, hasattr(self.driver, "BINARY"), "module.BINARY must be defined." + self.assertTrue( + hasattr(self.driver, "BINARY"), "module.BINARY must be defined." ) def test_NUMBER(self): - _failUnless( - self, hasattr(self.driver, "NUMBER"), "module.NUMBER must be defined." + self.assertTrue( + hasattr(self.driver, "NUMBER"), "module.NUMBER must be defined." ) def test_DATETIME(self): - _failUnless( - self, hasattr(self.driver, "DATETIME"), "module.DATETIME must be defined." + self.assertTrue( + hasattr(self.driver, "DATETIME"), "module.DATETIME must be defined." ) def test_ROWID(self): - _failUnless( - self, hasattr(self.driver, "ROWID"), "module.ROWID must be defined." - ) + self.assertTrue(hasattr(self.driver, "ROWID"), "module.ROWID must be defined.") diff --git a/adodbapi/test/setuptestframework.py b/adodbapi/test/setuptestframework.py index 91c46e0889..cb27a63042 100644 --- a/adodbapi/test/setuptestframework.py +++ b/adodbapi/test/setuptestframework.py @@ -47,7 +47,7 @@ def makeadopackage(testfolder): newpackage = os.path.join(testfolder, "adodbapi") try: os.mkdir(newpackage) - except (WindowsError, OSError): + except OSError: print( "*Note: temporary adodbapi package already exists: may be two versions running?" ) @@ -63,7 +63,7 @@ def makeadopackage(testfolder): sys.stdout = save return testfolder else: - raise EnvironmentError("Connot find source of adodbapi to test.") + raise OSError("Connot find source of adodbapi to test.") def makemdb(testfolder, mdb_name):