diff --git a/google/cloud/spanner_dbapi/client_side_statement_executor.py b/google/cloud/spanner_dbapi/client_side_statement_executor.py index f65e8ada1a..4ef43e9d74 100644 --- a/google/cloud/spanner_dbapi/client_side_statement_executor.py +++ b/google/cloud/spanner_dbapi/client_side_statement_executor.py @@ -11,19 +11,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from google.cloud.spanner_dbapi import Connection from google.cloud.spanner_dbapi.parsed_statement import ( ParsedStatement, ClientSideStatementType, ) -def execute(connection, parsed_statement: ParsedStatement): +def execute(connection: "Connection", parsed_statement: ParsedStatement): """Executes the client side statements by calling the relevant method. It is an internal method that can make backwards-incompatible changes. + :type connection: Connection + :param connection: Connection object of the dbApi + :type parsed_statement: ParsedStatement :param parsed_statement: parsed_statement based on the sql query """ if parsed_statement.client_side_statement_type == ClientSideStatementType.COMMIT: return connection.commit() + if parsed_statement.client_side_statement_type == ClientSideStatementType.BEGIN: + return connection.begin() + if parsed_statement.client_side_statement_type == ClientSideStatementType.ROLLBACK: + return connection.rollback() diff --git a/google/cloud/spanner_dbapi/client_side_statement_parser.py b/google/cloud/spanner_dbapi/client_side_statement_parser.py index e93b71f3e1..ce1474e809 100644 --- a/google/cloud/spanner_dbapi/client_side_statement_parser.py +++ b/google/cloud/spanner_dbapi/client_side_statement_parser.py @@ -20,7 +20,9 @@ ClientSideStatementType, ) +RE_BEGIN = re.compile(r"^\s*(BEGIN|START)(TRANSACTION)?", re.IGNORECASE) RE_COMMIT = re.compile(r"^\s*(COMMIT)(TRANSACTION)?", re.IGNORECASE) +RE_ROLLBACK = re.compile(r"^\s*(ROLLBACK)(TRANSACTION)?", re.IGNORECASE) def parse_stmt(query): @@ -39,4 +41,12 @@ def parse_stmt(query): return ParsedStatement( StatementType.CLIENT_SIDE, query, ClientSideStatementType.COMMIT ) + if RE_BEGIN.match(query): + return ParsedStatement( + StatementType.CLIENT_SIDE, query, ClientSideStatementType.BEGIN + ) + if RE_ROLLBACK.match(query): + return ParsedStatement( + StatementType.CLIENT_SIDE, query, ClientSideStatementType.ROLLBACK + ) return None diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index efbdc80f3f..a3306b316c 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -34,7 +34,9 @@ from google.rpc.code_pb2 import ABORTED -AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode" +CLIENT_TRANSACTION_NOT_STARTED_WARNING = ( + "This method is non-operational as transaction has not started" +) MAX_INTERNAL_RETRIES = 50 @@ -104,6 +106,7 @@ def __init__(self, instance, database=None, read_only=False): self._read_only = read_only self._staleness = None self.request_priority = None + self._transaction_begin_marked = False @property def autocommit(self): @@ -122,7 +125,7 @@ def autocommit(self, value): :type value: bool :param value: New autocommit mode state. """ - if value and not self._autocommit and self.inside_transaction: + if value and not self._autocommit and self._spanner_transaction_started: self.commit() self._autocommit = value @@ -137,17 +140,35 @@ def database(self): return self._database @property - def inside_transaction(self): - """Flag: transaction is started. + def _spanner_transaction_started(self): + """Flag: whether transaction started at Spanner. This means that we had + made atleast one call to Spanner. Property client_transaction_started + would always be true if this is true as transaction has to start first + at clientside than at Spanner Returns: - bool: True if transaction begun, False otherwise. + bool: True if Spanner transaction started, False otherwise. """ return ( self._transaction and not self._transaction.committed and not self._transaction.rolled_back - ) + ) or (self._snapshot is not None) + + @property + def inside_transaction(self): + """Deprecated property which won't be supported in future versions. + Please use spanner_transaction_started property instead.""" + return self._spanner_transaction_started + + @property + def _client_transaction_started(self): + """Flag: whether transaction started at client side. + + Returns: + bool: True if transaction started, False otherwise. + """ + return (not self._autocommit) or self._transaction_begin_marked @property def instance(self): @@ -175,7 +196,7 @@ def read_only(self, value): Args: value (bool): True for ReadOnly mode, False for ReadWrite. """ - if self.inside_transaction: + if self._spanner_transaction_started: raise ValueError( "Connection read/write mode can't be changed while a transaction is in progress. " "Commit or rollback the current transaction and try again." @@ -213,7 +234,7 @@ def staleness(self, value): Args: value (dict): Staleness type and value. """ - if self.inside_transaction: + if self._spanner_transaction_started: raise ValueError( "`staleness` option can't be changed while a transaction is in progress. " "Commit or rollback the current transaction and try again." @@ -331,15 +352,16 @@ def transaction_checkout(self): """Get a Cloud Spanner transaction. Begin a new transaction, if there is no transaction in - this connection yet. Return the begun one otherwise. + this connection yet. Return the started one otherwise. - The method is non operational in autocommit mode. + This method is a no-op if the connection is in autocommit mode and no + explicit transaction has been started :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` :returns: A Cloud Spanner transaction object, ready to use. """ - if not self.autocommit: - if not self.inside_transaction: + if not self.read_only and self._client_transaction_started: + if not self._spanner_transaction_started: self._transaction = self._session_checkout().transaction() self._transaction.begin() @@ -354,7 +376,7 @@ def snapshot_checkout(self): :rtype: :class:`google.cloud.spanner_v1.snapshot.Snapshot` :returns: A Cloud Spanner snapshot object, ready to use. """ - if self.read_only and not self.autocommit: + if self.read_only and self._client_transaction_started: if not self._snapshot: self._snapshot = Snapshot( self._session_checkout(), multi_use=True, **self.staleness @@ -369,7 +391,7 @@ def close(self): The connection will be unusable from this point forward. If the connection has an active transaction, it will be rolled back. """ - if self.inside_transaction: + if self._spanner_transaction_started and not self.read_only: self._transaction.rollback() if self._own_pool and self.database: @@ -377,27 +399,47 @@ def close(self): self.is_closed = True + @check_not_closed + def begin(self): + """ + Marks the transaction as started. + + :raises: :class:`InterfaceError`: if this connection is closed. + :raises: :class:`OperationalError`: if there is an existing transaction that has begin or is running + """ + if self._transaction_begin_marked: + raise OperationalError("A transaction has already started") + if self._spanner_transaction_started: + raise OperationalError( + "Beginning a new transaction is not allowed when a transaction is already running" + ) + self._transaction_begin_marked = True + def commit(self): """Commits any pending transaction to the database. - This method is non-operational in autocommit mode. + This is a no-op if there is no active client transaction. """ if self.database is None: raise ValueError("Database needs to be passed for this operation") - self._snapshot = None - if self._autocommit: - warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) + if not self._client_transaction_started: + warnings.warn( + CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 + ) return self.run_prior_DDL_statements() - if self.inside_transaction: + if self._spanner_transaction_started: try: - if not self.read_only: + if self.read_only: + self._snapshot = None + else: self._transaction.commit() self._release_session() self._statements = [] + self._transaction_begin_marked = False except Aborted: self.retry_transaction() self.commit() @@ -405,19 +447,24 @@ def commit(self): def rollback(self): """Rolls back any pending transaction. - This is a no-op if there is no active transaction or if the connection - is in autocommit mode. + This is a no-op if there is no active client transaction. """ - self._snapshot = None - if self._autocommit: - warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) - elif self._transaction: - if not self.read_only: + if not self._client_transaction_started: + warnings.warn( + CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 + ) + return + + if self._spanner_transaction_started: + if self.read_only: + self._snapshot = None + else: self._transaction.rollback() self._release_session() self._statements = [] + self._transaction_begin_marked = False @check_not_closed def cursor(self): diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 95d20f5730..023149eeb0 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -250,7 +250,7 @@ def execute(self, sql, args=None): ) if parsed_statement.statement_type == StatementType.DDL: self._batch_DDLs(sql) - if self.connection.autocommit: + if not self.connection._client_transaction_started: self.connection.run_prior_DDL_statements() return @@ -264,7 +264,7 @@ def execute(self, sql, args=None): sql, args = sql_pyformat_args_to_spanner(sql, args or None) - if not self.connection.autocommit: + if self.connection._client_transaction_started: statement = Statement( sql, args, @@ -348,7 +348,7 @@ def executemany(self, operation, seq_of_params): ) statements.append((sql, params, get_param_types(params))) - if self.connection.autocommit: + if not self.connection._client_transaction_started: self.connection.database.run_in_transaction( self._do_batch_update, statements, many_result_set ) @@ -396,7 +396,10 @@ def fetchone(self): sequence, or None when no more data is available.""" try: res = next(self) - if not self.connection.autocommit and not self.connection.read_only: + if ( + self.connection._client_transaction_started + and not self.connection.read_only + ): self._checksum.consume_result(res) return res except StopIteration: @@ -414,7 +417,10 @@ def fetchall(self): res = [] try: for row in self: - if not self.connection.autocommit and not self.connection.read_only: + if ( + self.connection._client_transaction_started + and not self.connection.read_only + ): self._checksum.consume_result(row) res.append(row) except Aborted: @@ -443,7 +449,10 @@ def fetchmany(self, size=None): for _ in range(size): try: res = next(self) - if not self.connection.autocommit and not self.connection.read_only: + if ( + self.connection._client_transaction_started + and not self.connection.read_only + ): self._checksum.consume_result(res) items.append(res) except StopIteration: @@ -473,7 +482,7 @@ def _handle_DQL(self, sql, params): if self.connection.database is None: raise ValueError("Database needs to be passed for this operation") sql, params = parse_utils.sql_pyformat_args_to_spanner(sql, params) - if self.connection.read_only and not self.connection.autocommit: + if self.connection.read_only and self.connection._client_transaction_started: # initiate or use the existing multi-use snapshot self._handle_DQL_with_snapshot( self.connection.snapshot_checkout(), sql, params diff --git a/google/cloud/spanner_dbapi/parsed_statement.py b/google/cloud/spanner_dbapi/parsed_statement.py index c36bc1d81c..28705b69ed 100644 --- a/google/cloud/spanner_dbapi/parsed_statement.py +++ b/google/cloud/spanner_dbapi/parsed_statement.py @@ -27,6 +27,7 @@ class StatementType(Enum): class ClientSideStatementType(Enum): COMMIT = 1 BEGIN = 2 + ROLLBACK = 3 @dataclass diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py index bd49e478ba..26af9e5e0f 100644 --- a/tests/system/test_dbapi.py +++ b/tests/system/test_dbapi.py @@ -21,10 +21,8 @@ from google.cloud import spanner_v1 from google.cloud._helpers import UTC -from google.cloud.spanner_dbapi import Cursor -from google.cloud.spanner_dbapi.connection import connect -from google.cloud.spanner_dbapi.connection import Connection -from google.cloud.spanner_dbapi.exceptions import ProgrammingError +from google.cloud.spanner_dbapi.connection import Connection, connect +from google.cloud.spanner_dbapi.exceptions import ProgrammingError, OperationalError from google.cloud.spanner_v1 import JsonObject from google.cloud.spanner_v1 import gapic_version as package_version from . import _helpers @@ -44,10 +42,10 @@ @pytest.fixture(scope="session") def raw_database(shared_instance, database_operation_timeout, not_postgres): - databse_id = _helpers.unique_id("dbapi-txn") + database_id = _helpers.unique_id("dbapi-txn") pool = spanner_v1.BurstyPool(labels={"testcase": "database_api"}) database = shared_instance.database( - databse_id, + database_id, ddl_statements=DDL_STATEMENTS, pool=pool, ) @@ -59,779 +57,746 @@ def raw_database(shared_instance, database_operation_timeout, not_postgres): database.drop() -def clear_table(transaction): - transaction.execute_update("DELETE FROM contacts WHERE true") +class TestDbApi: + @staticmethod + def clear_table(transaction): + transaction.execute_update("DELETE FROM contacts WHERE true") + @pytest.fixture(scope="function") + def dbapi_database(self, raw_database): + raw_database.run_in_transaction(self.clear_table) -@pytest.fixture(scope="function") -def dbapi_database(raw_database): - raw_database.run_in_transaction(clear_table) + yield raw_database - yield raw_database + raw_database.run_in_transaction(self.clear_table) - raw_database.run_in_transaction(clear_table) + @pytest.fixture(autouse=True) + def init_connection(self, request, shared_instance, dbapi_database): + if "noautofixt" not in request.keywords: + self._conn = Connection(shared_instance, dbapi_database) + self._cursor = self._conn.cursor() + yield + if "noautofixt" not in request.keywords: + self._cursor.close() + self._conn.close() + def _execute_common_statements(self, cursor): + # execute several DML statements within one transaction + cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' + """ + ) + cursor.execute( + """ + UPDATE contacts + SET email = 'test.email_updated@domen.ru' + WHERE email = 'test.email@domen.ru' + """ + ) + return ( + 1, + "updated-first-name", + "last-name", + "test.email_updated@domen.ru", + ) -def test_commit(shared_instance, dbapi_database): - """Test committing a transaction with several statements.""" - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() - - want_row = _execute_common_precommit_statements(cursor) - conn.commit() - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - - assert got_rows == [want_row] - - cursor.close() - conn.close() - - -def test_commit_client_side(shared_instance, dbapi_database): - """Test committing a transaction with several statements.""" - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() - - want_row = _execute_common_precommit_statements(cursor) - cursor.execute("""COMMIT""") - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - cursor.close() - conn.close() - - assert got_rows == [want_row] - + @pytest.mark.parametrize("client_side", [False, True]) + def test_commit(self, client_side): + """Test committing a transaction with several statements.""" + updated_row = self._execute_common_statements(self._cursor) + if client_side: + self._cursor.execute("""COMMIT""") + else: + self._conn.commit() + + # read the resulting data from the database + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() + self._conn.commit() + + assert got_rows == [updated_row] + + @pytest.mark.noautofixt + def test_begin_client_side(self, shared_instance, dbapi_database): + """Test beginning a transaction using client side statement, + where connection is in autocommit mode.""" + + conn1 = Connection(shared_instance, dbapi_database) + conn1.autocommit = True + cursor1 = conn1.cursor() + cursor1.execute("begin transaction") + updated_row = self._execute_common_statements(cursor1) + + assert conn1._transaction_begin_marked is True + conn1.commit() + assert conn1._transaction_begin_marked is False + cursor1.close() + conn1.close() + + # As the connection conn1 is committed a new connection should see its results + conn3 = Connection(shared_instance, dbapi_database) + cursor3 = conn3.cursor() + cursor3.execute("SELECT * FROM contacts") + conn3.commit() + got_rows = cursor3.fetchall() + cursor3.close() + conn3.close() + assert got_rows == [updated_row] + + def test_begin_success_post_commit(self): + """Test beginning a new transaction post commiting an existing transaction + is possible on a connection, when connection is in autocommit mode.""" + want_row = (2, "first-name", "last-name", "test.email@domen.ru") + self._conn.autocommit = True + self._cursor.execute("begin transaction") + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + self._conn.commit() + + self._cursor.execute("begin transaction") + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() + self._conn.commit() + assert got_rows == [want_row] + + def test_begin_error_before_commit(self): + """Test beginning a new transaction before commiting an existing transaction is not possible on a connection, when connection is in autocommit mode.""" + self._conn.autocommit = True + self._cursor.execute("begin transaction") + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) -def test_rollback(shared_instance, dbapi_database): - """Test rollbacking a transaction with several statements.""" - want_row = (2, "first-name", "last-name", "test.email@domen.ru") - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + with pytest.raises(OperationalError): + self._cursor.execute("begin transaction") - cursor.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - conn.commit() + @pytest.mark.parametrize("client_side", [False, True]) + def test_rollback(self, client_side): + """Test rollbacking a transaction with several statements.""" + want_row = (2, "first-name", "last-name", "test.email@domen.ru") - # execute several DMLs with one transaction - cursor.execute( - """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - cursor.execute( + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') """ -UPDATE contacts -SET email = 'test.email_updated@domen.ru' -WHERE email = 'test.email@domen.ru' -""" - ) - conn.rollback() - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - - assert got_rows == [want_row] - - cursor.close() - conn.close() + ) + self._conn.commit() + # execute several DMLs with one transaction + self._cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' + """ + ) + self._cursor.execute( + """ + UPDATE contacts + SET email = 'test.email_updated@domen.ru' + WHERE email = 'test.email@domen.ru' + """ + ) -def test_autocommit_mode_change(shared_instance, dbapi_database): - """Test auto committing a transaction on `autocommit` mode change.""" - want_row = ( - 2, - "updated-first-name", - "last-name", - "test.email@domen.ru", - ) - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + if client_side: + self._cursor.execute("ROLLBACK") + else: + self._conn.rollback() + + # read the resulting data from the database + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() + self._conn.commit() + + assert got_rows == [want_row] + + def test_autocommit_mode_change(self): + """Test auto committing a transaction on `autocommit` mode change.""" + want_row = ( + 2, + "updated-first-name", + "last-name", + "test.email@domen.ru", + ) - cursor.execute( + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + ) + self._cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' """ - ) - cursor.execute( - """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - conn.autocommit = True - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - - assert got_rows == [want_row] - - cursor.close() - conn.close() + ) + self._conn.autocommit = True + # read the resulting data from the database + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() -def test_rollback_on_connection_closing(shared_instance, dbapi_database): - """ - When closing a connection all the pending transactions - must be rollbacked. Testing if it's working this way. - """ - want_row = (1, "first-name", "last-name", "test.email@domen.ru") - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + assert got_rows == [want_row] - cursor.execute( + @pytest.mark.noautofixt + def test_rollback_on_connection_closing(self, shared_instance, dbapi_database): """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - conn.commit() - - cursor.execute( + When closing a connection all the pending transactions + must be rollbacked. Testing if it's working this way. """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - conn.close() - - # connect again, as the previous connection is no-op after closing - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - - assert got_rows == [want_row] - - cursor.close() - conn.close() - - -def test_results_checksum(shared_instance, dbapi_database): - """Test that results checksum is calculated properly.""" - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + want_row = (1, "first-name", "last-name", "test.email@domen.ru") + # connect to the test database + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() - cursor.execute( + cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES -(1, 'first-name', 'last-name', 'test.email@domen.ru'), -(2, 'first-name2', 'last-name2', 'test.email2@domen.ru') - """ - ) - assert len(conn._statements) == 1 - conn.commit() + ) + conn.commit() - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() + cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' + """ + ) + conn.close() - assert len(conn._statements) == 1 - conn.commit() + # connect again, as the previous connection is no-op after closing + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() - checksum = hashlib.sha256() - checksum.update(pickle.dumps(got_rows[0])) - checksum.update(pickle.dumps(got_rows[1])) + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() - assert cursor._checksum.checksum.digest() == checksum.digest() + assert got_rows == [want_row] + cursor.close() + conn.close() -def test_execute_many(shared_instance, dbapi_database): - # connect to the test database - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() + def test_results_checksum(self): + """Test that results checksum is calculated properly.""" - row_data = [ - (1, "first-name", "last-name", "test.email@example.com"), - (2, "first-name2", "last-name2", "test.email2@example.com"), - ] - cursor.executemany( + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES + (1, 'first-name', 'last-name', 'test.email@domen.ru'), + (2, 'first-name2', 'last-name2', 'test.email2@domen.ru') """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (%s, %s, %s, %s) - """, - row_data, - ) - conn.commit() + ) + assert len(self._conn._statements) == 1 + self._conn.commit() - cursor.executemany( - """SELECT * FROM contacts WHERE contact_id = %s""", - ((1,), (2,)), - ) - res = cursor.fetchall() - conn.commit() + self._cursor.execute("SELECT * FROM contacts") + got_rows = self._cursor.fetchall() - assert len(res) == len(row_data) - for found, expected in zip(res, row_data): - assert found[0] == expected[0] + assert len(self._conn._statements) == 1 + self._conn.commit() - # checking that execute() and executemany() - # results are not mixed together - cursor.execute( - """ -SELECT * FROM contacts WHERE contact_id = 1 -""", - ) - res = cursor.fetchone() - conn.commit() + checksum = hashlib.sha256() + checksum.update(pickle.dumps(got_rows[0])) + checksum.update(pickle.dumps(got_rows[1])) - assert res[0] == 1 - conn.close() + assert self._cursor._checksum.checksum.digest() == checksum.digest() + def test_execute_many(self): + row_data = [ + (1, "first-name", "last-name", "test.email@example.com"), + (2, "first-name2", "last-name2", "test.email2@example.com"), + ] + self._cursor.executemany( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (%s, %s, %s, %s) + """, + row_data, + ) + self._conn.commit() -def test_DDL_autocommit(shared_instance, dbapi_database): - """Check that DDLs in autocommit mode are immediately executed.""" + self._cursor.executemany( + """SELECT * FROM contacts WHERE contact_id = %s""", + ((1,), (2,)), + ) + res = self._cursor.fetchall() + self._conn.commit() - try: - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True + assert len(res) == len(row_data) + for found, expected in zip(res, row_data): + assert found[0] == expected[0] - cur = conn.cursor() - cur.execute( + # checking that execute() and executemany() + # results are not mixed together + self._cursor.execute( """ - CREATE TABLE Singers ( + SELECT * FROM contacts WHERE contact_id = 1 + """, + ) + res = self._cursor.fetchone() + self._conn.commit() + + assert res[0] == 1 + + @pytest.mark.noautofixt + def test_DDL_autocommit(self, shared_instance, dbapi_database): + """Check that DDLs in autocommit mode are immediately executed.""" + + try: + conn = Connection(shared_instance, dbapi_database) + conn.autocommit = True + + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """ + ) + conn.close() + + # if previous DDL wasn't committed, the next DROP TABLE + # statement will fail with a ProgrammingError + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute("DROP TABLE Singers") + conn.commit() + finally: + # Delete table + table = dbapi_database.table("Singers") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE Singers"]) + op.result() + + def test_ddl_execute_autocommit_true(self, dbapi_database): + """Check that DDL statement in autocommit mode results in successful + DDL statement execution for execute method.""" + + self._conn.autocommit = True + self._cursor.execute( + """ + CREATE TABLE DdlExecuteAutocommit ( SingerId INT64 NOT NULL, Name STRING(1024), ) PRIMARY KEY (SingerId) - """ + """ ) - conn.close() - - # if previous DDL wasn't committed, the next DROP TABLE - # statement will fail with a ProgrammingError - conn = Connection(shared_instance, dbapi_database) - cur = conn.cursor() - - cur.execute("DROP TABLE Singers") - conn.commit() - finally: - # Delete table - table = dbapi_database.table("Singers") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE Singers"]) - op.result() - - -def test_ddl_execute_autocommit_true(shared_instance, dbapi_database): - """Check that DDL statement in autocommit mode results in successful - DDL statement execution for execute method.""" - - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE DdlExecuteAutocommit ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) - """ - ) - table = dbapi_database.table("DdlExecuteAutocommit") - assert table.exists() is True - - cur.close() - conn.close() - - -def test_ddl_executemany_autocommit_true(shared_instance, dbapi_database): - """Check that DDL statement in autocommit mode results in exception for - executemany method .""" - - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True - cur = conn.cursor() - with pytest.raises(ProgrammingError): - cur.executemany( + table = dbapi_database.table("DdlExecuteAutocommit") + assert table.exists() is True + + def test_ddl_executemany_autocommit_true(self, dbapi_database): + """Check that DDL statement in autocommit mode results in exception for + executemany method .""" + + self._conn.autocommit = True + with pytest.raises(ProgrammingError): + self._cursor.executemany( + """ + CREATE TABLE DdlExecuteManyAutocommit ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """, + [], + ) + table = dbapi_database.table("DdlExecuteManyAutocommit") + assert table.exists() is False + + def test_ddl_executemany_autocommit_false(self, dbapi_database): + """Check that DDL statement in non-autocommit mode results in exception for + executemany method .""" + with pytest.raises(ProgrammingError): + self._cursor.executemany( + """ + CREATE TABLE DdlExecuteManyAutocommit ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """, + [], + ) + table = dbapi_database.table("DdlExecuteManyAutocommit") + assert table.exists() is False + + def test_ddl_execute(self, dbapi_database): + """Check that DDL statement followed by non-DDL execute statement in + non autocommit mode results in successful DDL statement execution.""" + + want_row = ( + 1, + "first-name", + ) + self._cursor.execute( """ - CREATE TABLE DdlExecuteManyAutocommit ( + CREATE TABLE DdlExecute ( SingerId INT64 NOT NULL, Name STRING(1024), ) PRIMARY KEY (SingerId) - """, - [], + """ + ) + table = dbapi_database.table("DdlExecute") + assert table.exists() is False + + self._cursor.execute( + """ + INSERT INTO DdlExecute (SingerId, Name) + VALUES (1, "first-name") + """ ) - table = dbapi_database.table("DdlExecuteManyAutocommit") - assert table.exists() is False + assert table.exists() is True + self._conn.commit() - cur.close() - conn.close() + # read the resulting data from the database + self._cursor.execute("SELECT * FROM DdlExecute") + got_rows = self._cursor.fetchall() + assert got_rows == [want_row] -def test_ddl_executemany_autocommit_false(shared_instance, dbapi_database): - """Check that DDL statement in non-autocommit mode results in exception for - executemany method .""" + def test_ddl_executemany(self, dbapi_database): + """Check that DDL statement followed by non-DDL executemany statement in + non autocommit mode results in successful DDL statement execution.""" - conn = Connection(shared_instance, dbapi_database) - cur = conn.cursor() - with pytest.raises(ProgrammingError): - cur.executemany( + want_row = ( + 1, + "first-name", + ) + self._cursor.execute( """ - CREATE TABLE DdlExecuteManyAutocommit ( + CREATE TABLE DdlExecuteMany ( SingerId INT64 NOT NULL, Name STRING(1024), ) PRIMARY KEY (SingerId) - """, - [], + """ ) - table = dbapi_database.table("DdlExecuteManyAutocommit") - assert table.exists() is False - - cur.close() - conn.close() + table = dbapi_database.table("DdlExecuteMany") + assert table.exists() is False + self._cursor.executemany( + """ + INSERT INTO DdlExecuteMany (SingerId, Name) + VALUES (%s, %s) + """, + [want_row], + ) + assert table.exists() is True + self._conn.commit() -def test_ddl_execute(shared_instance, dbapi_database): - """Check that DDL statement followed by non-DDL execute statement in - non autocommit mode results in successful DDL statement execution.""" + # read the resulting data from the database + self._cursor.execute("SELECT * FROM DdlExecuteMany") + got_rows = self._cursor.fetchall() - conn = Connection(shared_instance, dbapi_database) - want_row = ( - 1, - "first-name", - ) - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE DdlExecute ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) - """ - ) - table = dbapi_database.table("DdlExecute") - assert table.exists() is False + assert got_rows == [want_row] - cur.execute( + @pytest.mark.skipif(_helpers.USE_EMULATOR, reason="Emulator does not support json.") + def test_autocommit_with_json_data(self, dbapi_database): """ - INSERT INTO DdlExecute (SingerId, Name) - VALUES (1, "first-name") + Check that DDLs in autocommit mode are immediately + executed for json fields. """ - ) - assert table.exists() is True - conn.commit() - - # read the resulting data from the database - cur.execute("SELECT * FROM DdlExecute") - got_rows = cur.fetchall() - - assert got_rows == [want_row] - - cur.close() - conn.close() - - -def test_ddl_executemany(shared_instance, dbapi_database): - """Check that DDL statement followed by non-DDL executemany statement in - non autocommit mode results in successful DDL statement execution.""" + try: + self._conn.autocommit = True + self._cursor.execute( + """ + CREATE TABLE JsonDetails ( + DataId INT64 NOT NULL, + Details JSON, + ) PRIMARY KEY (DataId) + """ + ) + + # Insert data to table + self._cursor.execute( + sql="INSERT INTO JsonDetails (DataId, Details) VALUES (%s, %s)", + args=(123, JsonObject({"name": "Jakob", "age": "26"})), + ) + + # Read back the data. + self._cursor.execute("""select * from JsonDetails;""") + got_rows = self._cursor.fetchall() + + # Assert the response + assert len(got_rows) == 1 + assert got_rows[0][0] == 123 + assert got_rows[0][1] == {"age": "26", "name": "Jakob"} + + # Drop the table + self._cursor.execute("DROP TABLE JsonDetails") + self._conn.commit() + finally: + # Delete table + table = dbapi_database.table("JsonDetails") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) + op.result() + + @pytest.mark.skipif(_helpers.USE_EMULATOR, reason="Emulator does not support json.") + def test_json_array(self, dbapi_database): + try: + # Create table + self._conn.autocommit = True + + self._cursor.execute( + """ + CREATE TABLE JsonDetails ( + DataId INT64 NOT NULL, + Details JSON, + ) PRIMARY KEY (DataId) + """ + ) + self._cursor.execute( + "INSERT INTO JsonDetails (DataId, Details) VALUES (%s, %s)", + [1, JsonObject([1, 2, 3])], + ) + + self._cursor.execute("SELECT * FROM JsonDetails WHERE DataId = 1") + row = self._cursor.fetchone() + assert isinstance(row[1], JsonObject) + assert row[1].serialize() == "[1,2,3]" + + self._cursor.execute("DROP TABLE JsonDetails") + finally: + # Delete table + table = dbapi_database.table("JsonDetails") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) + op.result() + + @pytest.mark.noautofixt + def test_DDL_commit(self, shared_instance, dbapi_database): + """Check that DDLs in commit mode are executed on calling `commit()`.""" + try: + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute( + """ + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """ + ) + conn.commit() + conn.close() + + # if previous DDL wasn't committed, the next DROP TABLE + # statement will fail with a ProgrammingError + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute("DROP TABLE Singers") + conn.commit() + finally: + # Delete table + table = dbapi_database.table("Singers") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE Singers"]) + op.result() + + def test_ping(self): + """Check connection validation method.""" + self._conn.validate() + + @pytest.mark.noautofixt + def test_user_agent(self, shared_instance, dbapi_database): + """Check that DB API uses an appropriate user agent.""" + conn = connect(shared_instance.name, dbapi_database.name) + assert ( + conn.instance._client._client_info.user_agent + == "gl-dbapi/" + package_version.__version__ + ) + assert ( + conn.instance._client._client_info.client_library_version + == package_version.__version__ + ) - conn = Connection(shared_instance, dbapi_database) - want_row = ( - 1, - "first-name", - ) - cur = conn.cursor() - cur.execute( + def test_read_only(self): """ - CREATE TABLE DdlExecuteMany ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) - """ - ) - table = dbapi_database.table("DdlExecuteMany") - assert table.exists() is False - - cur.executemany( + Check that connection set to `read_only=True` uses + ReadOnly transactions. """ - INSERT INTO DdlExecuteMany (SingerId, Name) - VALUES (%s, %s) - """, - [want_row], - ) - assert table.exists() is True - conn.commit() - - # read the resulting data from the database - cur.execute("SELECT * FROM DdlExecuteMany") - got_rows = cur.fetchall() - assert got_rows == [want_row] - - cur.close() - conn.close() - - -@pytest.mark.skipif(_helpers.USE_EMULATOR, reason="Emulator does not support json.") -def test_autocommit_with_json_data(shared_instance, dbapi_database): - """ - Check that DDLs in autocommit mode are immediately - executed for json fields. + self._conn.read_only = True + with pytest.raises(ProgrammingError): + self._cursor.execute( + """ + UPDATE contacts + SET first_name = 'updated-first-name' + WHERE first_name = 'first-name' """ - try: - # Create table - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True - - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE JsonDetails ( - DataId INT64 NOT NULL, - Details JSON, - ) PRIMARY KEY (DataId) - """ - ) - - # Insert data to table - cur.execute( - sql="INSERT INTO JsonDetails (DataId, Details) VALUES (%s, %s)", - args=(123, JsonObject({"name": "Jakob", "age": "26"})), - ) + ) - # Read back the data. - cur.execute("""select * from JsonDetails;""") - got_rows = cur.fetchall() + self._cursor.execute("SELECT * FROM contacts") + self._conn.commit() - # Assert the response - assert len(got_rows) == 1 - assert got_rows[0][0] == 123 - assert got_rows[0][1] == {"age": "26", "name": "Jakob"} + def test_staleness(self): + """Check the DB API `staleness` option.""" - # Drop the table - cur.execute("DROP TABLE JsonDetails") - conn.commit() - conn.close() - finally: - # Delete table - table = dbapi_database.table("JsonDetails") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) - op.result() - - -@pytest.mark.skipif(_helpers.USE_EMULATOR, reason="Emulator does not support json.") -def test_json_array(shared_instance, dbapi_database): - try: - # Create table - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = True + before_insert = datetime.datetime.utcnow().replace(tzinfo=UTC) + time.sleep(0.25) - cur = conn.cursor() - cur.execute( + self._cursor.execute( """ - CREATE TABLE JsonDetails ( - DataId INT64 NOT NULL, - Details JSON, - ) PRIMARY KEY (DataId) + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@example.com') """ ) - cur.execute( - "INSERT INTO JsonDetails (DataId, Details) VALUES (%s, %s)", - [1, JsonObject([1, 2, 3])], - ) - - cur.execute("SELECT * FROM JsonDetails WHERE DataId = 1") - row = cur.fetchone() - assert isinstance(row[1], JsonObject) - assert row[1].serialize() == "[1,2,3]" - - cur.execute("DROP TABLE JsonDetails") - conn.close() - finally: - # Delete table - table = dbapi_database.table("JsonDetails") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE JsonDetails"]) - op.result() - - -def test_DDL_commit(shared_instance, dbapi_database): - """Check that DDLs in commit mode are executed on calling `commit()`.""" - try: - conn = Connection(shared_instance, dbapi_database) - cur = conn.cursor() - - cur.execute( + self._conn.commit() + + self._conn.read_only = True + self._conn.staleness = {"read_timestamp": before_insert} + self._cursor.execute("SELECT * FROM contacts") + self._conn.commit() + assert len(self._cursor.fetchall()) == 0 + + self._conn.staleness = None + self._cursor.execute("SELECT * FROM contacts") + self._conn.commit() + assert len(self._cursor.fetchall()) == 1 + + @pytest.mark.parametrize("autocommit", [False, True]) + def test_rowcount(self, dbapi_database, autocommit): + try: + self._conn.autocommit = autocommit + + self._cursor.execute( + """ + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """ + ) + self._conn.commit() + + # executemany sets rowcount to the total modified rows + rows = [(i, f"Singer {i}") for i in range(100)] + self._cursor.executemany( + "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s)", rows[:98] + ) + assert self._cursor.rowcount == 98 + + # execute with INSERT + self._cursor.execute( + "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s), (%s, %s)", + [x for row in rows[98:] for x in row], + ) + assert self._cursor.rowcount == 2 + + # execute with UPDATE + self._cursor.execute("UPDATE Singers SET Name = 'Cher' WHERE SingerId < 25") + assert self._cursor.rowcount == 25 + + # execute with SELECT + self._cursor.execute("SELECT Name FROM Singers WHERE SingerId < 75") + assert len(self._cursor.fetchall()) == 75 + # rowcount is not available for SELECT + assert self._cursor.rowcount == -1 + + # execute with DELETE + self._cursor.execute("DELETE FROM Singers") + assert self._cursor.rowcount == 100 + + # execute with UPDATE matching 0 rows + self._cursor.execute("UPDATE Singers SET Name = 'Cher' WHERE SingerId < 25") + assert self._cursor.rowcount == 0 + + self._conn.commit() + self._cursor.execute("DROP TABLE Singers") + self._conn.commit() + finally: + # Delete table + table = dbapi_database.table("Singers") + if table.exists(): + op = dbapi_database.update_ddl(["DROP TABLE Singers"]) + op.result() + + @pytest.mark.parametrize("autocommit", [False, True]) + @pytest.mark.skipif( + _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." + ) + def test_dml_returning_insert(self, autocommit): + self._conn.autocommit = autocommit + self._cursor.execute( """ - CREATE TABLE Singers ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@example.com') + THEN RETURN contact_id, first_name """ ) - conn.commit() - conn.close() + assert self._cursor.fetchone() == (1, "first-name") + assert self._cursor.rowcount == 1 + self._conn.commit() - # if previous DDL wasn't committed, the next DROP TABLE - # statement will fail with a ProgrammingError - conn = Connection(shared_instance, dbapi_database) - cur = conn.cursor() - - cur.execute("DROP TABLE Singers") - conn.commit() - finally: - # Delete table - table = dbapi_database.table("Singers") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE Singers"]) - op.result() - - -def test_ping(shared_instance, dbapi_database): - """Check connection validation method.""" - conn = Connection(shared_instance, dbapi_database) - conn.validate() - conn.close() - - -def test_user_agent(shared_instance, dbapi_database): - """Check that DB API uses an appropriate user agent.""" - conn = connect(shared_instance.name, dbapi_database.name) - assert ( - conn.instance._client._client_info.user_agent - == "gl-dbapi/" + package_version.__version__ - ) - assert ( - conn.instance._client._client_info.client_library_version - == package_version.__version__ + @pytest.mark.parametrize("autocommit", [False, True]) + @pytest.mark.skipif( + _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." ) - - -def test_read_only(shared_instance, dbapi_database): - """ - Check that connection set to `read_only=True` uses - ReadOnly transactions. - """ - conn = Connection(shared_instance, dbapi_database, read_only=True) - cur = conn.cursor() - - with pytest.raises(ProgrammingError): - cur.execute( + def test_dml_returning_update(self, autocommit): + self._conn.autocommit = autocommit + self._cursor.execute( """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - - cur.execute("SELECT * FROM contacts") - conn.commit() - - -def test_staleness(shared_instance, dbapi_database): - """Check the DB API `staleness` option.""" - conn = Connection(shared_instance, dbapi_database) - cursor = conn.cursor() - - before_insert = datetime.datetime.utcnow().replace(tzinfo=UTC) - time.sleep(0.25) - - cursor.execute( + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@example.com') """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com') - """ - ) - conn.commit() - - conn.read_only = True - conn.staleness = {"read_timestamp": before_insert} - cursor.execute("SELECT * FROM contacts") - conn.commit() - assert len(cursor.fetchall()) == 0 - - conn.staleness = None - cursor.execute("SELECT * FROM contacts") - conn.commit() - assert len(cursor.fetchall()) == 1 - - conn.close() - - -@pytest.mark.parametrize("autocommit", [False, True]) -def test_rowcount(shared_instance, dbapi_database, autocommit): - try: - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = autocommit - cur = conn.cursor() - - cur.execute( + ) + assert self._cursor.rowcount == 1 + self._cursor.execute( """ - CREATE TABLE Singers ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) + UPDATE contacts SET first_name = 'new-name' WHERE contact_id = 1 + THEN RETURN contact_id, first_name """ ) - conn.commit() - - # executemany sets rowcount to the total modified rows - rows = [(i, f"Singer {i}") for i in range(100)] - cur.executemany( - "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s)", rows[:98] - ) - assert cur.rowcount == 98 - - # execute with INSERT - cur.execute( - "INSERT INTO Singers (SingerId, Name) VALUES (%s, %s), (%s, %s)", - [x for row in rows[98:] for x in row], - ) - assert cur.rowcount == 2 - - # execute with UPDATE - cur.execute("UPDATE Singers SET Name = 'Cher' WHERE SingerId < 25") - assert cur.rowcount == 25 - - # execute with SELECT - cur.execute("SELECT Name FROM Singers WHERE SingerId < 75") - assert len(cur.fetchall()) == 75 - # rowcount is not available for SELECT - assert cur.rowcount == -1 - - # execute with DELETE - cur.execute("DELETE FROM Singers") - assert cur.rowcount == 100 - - # execute with UPDATE matching 0 rows - cur.execute("UPDATE Singers SET Name = 'Cher' WHERE SingerId < 25") - assert cur.rowcount == 0 - - conn.commit() - cur.execute("DROP TABLE Singers") - conn.commit() - finally: - # Delete table - table = dbapi_database.table("Singers") - if table.exists(): - op = dbapi_database.update_ddl(["DROP TABLE Singers"]) - op.result() - - -@pytest.mark.parametrize("autocommit", [False, True]) -@pytest.mark.skipif( - _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." -) -def test_dml_returning_insert(shared_instance, dbapi_database, autocommit): - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = autocommit - cur = conn.cursor() - cur.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com') -THEN RETURN contact_id, first_name - """ - ) - assert cur.fetchone() == (1, "first-name") - assert cur.rowcount == 1 - conn.commit() - - -@pytest.mark.parametrize("autocommit", [False, True]) -@pytest.mark.skipif( - _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." -) -def test_dml_returning_update(shared_instance, dbapi_database, autocommit): - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = autocommit - cur = conn.cursor() - cur.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com') - """ - ) - assert cur.rowcount == 1 - cur.execute( - """ -UPDATE contacts SET first_name = 'new-name' WHERE contact_id = 1 -THEN RETURN contact_id, first_name - """ - ) - assert cur.fetchone() == (1, "new-name") - assert cur.rowcount == 1 - conn.commit() + assert self._cursor.fetchone() == (1, "new-name") + assert self._cursor.rowcount == 1 + self._conn.commit() - -@pytest.mark.parametrize("autocommit", [False, True]) -@pytest.mark.skipif( - _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." -) -def test_dml_returning_delete(shared_instance, dbapi_database, autocommit): - conn = Connection(shared_instance, dbapi_database) - conn.autocommit = autocommit - cur = conn.cursor() - cur.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@example.com') - """ + @pytest.mark.parametrize("autocommit", [False, True]) + @pytest.mark.skipif( + _helpers.USE_EMULATOR, reason="Emulator does not support DML Returning." ) - assert cur.rowcount == 1 - cur.execute( - """ -DELETE FROM contacts WHERE contact_id = 1 -THEN RETURN contact_id, first_name - """ - ) - assert cur.fetchone() == (1, "first-name") - assert cur.rowcount == 1 - conn.commit() - - -def _execute_common_precommit_statements(cursor: Cursor): - # execute several DML statements within one transaction - cursor.execute( - """ - INSERT INTO contacts (contact_id, first_name, last_name, email) - VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - cursor.execute( - """ - UPDATE contacts - SET first_name = 'updated-first-name' - WHERE first_name = 'first-name' - """ - ) - cursor.execute( + def test_dml_returning_delete(self, autocommit): + self._conn.autocommit = autocommit + self._cursor.execute( + """ + INSERT INTO contacts (contact_id, first_name, last_name, email) + VALUES (1, 'first-name', 'last-name', 'test.email@example.com') """ - UPDATE contacts - SET email = 'test.email_updated@domen.ru' - WHERE email = 'test.email@domen.ru' + ) + assert self._cursor.rowcount == 1 + self._cursor.execute( + """ + DELETE FROM contacts WHERE contact_id = 1 + THEN RETURN contact_id, first_name """ - ) - return ( - 1, - "updated-first-name", - "last-name", - "test.email_updated@domen.ru", - ) + ) + assert self._cursor.fetchone() == (1, "first-name") + assert self._cursor.rowcount == 1 + self._conn.commit() diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 1628f84062..91b2e3d5e8 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -19,6 +19,7 @@ import unittest import warnings import pytest +from google.cloud.spanner_dbapi.exceptions import InterfaceError, OperationalError PROJECT = "test-project" INSTANCE = "test-instance" @@ -36,6 +37,9 @@ class _CredentialsWithScopes(credentials.Credentials, credentials.Scoped): class TestConnection(unittest.TestCase): + def setUp(self): + self._under_test = self._make_connection() + def _get_client_info(self): from google.api_core.gapic_v1.client_info import ClientInfo @@ -226,6 +230,8 @@ def test_snapshot_checkout(self): session_checkout = mock.MagicMock(autospec=True) connection._session_checkout = session_checkout + release_session = mock.MagicMock() + connection._release_session = release_session snapshot = connection.snapshot_checkout() session_checkout.assert_called_once() @@ -234,6 +240,7 @@ def test_snapshot_checkout(self): connection.commit() self.assertIsNone(connection._snapshot) + release_session.assert_called_once() connection.snapshot_checkout() self.assertIsNotNone(connection._snapshot) @@ -280,7 +287,9 @@ def test_close(self, mock_client): @mock.patch.object(warnings, "warn") def test_commit(self, mock_warn): from google.cloud.spanner_dbapi import Connection - from google.cloud.spanner_dbapi.connection import AUTOCOMMIT_MODE_WARNING + from google.cloud.spanner_dbapi.connection import ( + CLIENT_TRANSACTION_NOT_STARTED_WARNING, + ) connection = Connection(INSTANCE, DATABASE) @@ -307,7 +316,7 @@ def test_commit(self, mock_warn): connection._autocommit = True connection.commit() mock_warn.assert_called_once_with( - AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2 + CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 ) def test_commit_database_error(self): @@ -321,7 +330,9 @@ def test_commit_database_error(self): @mock.patch.object(warnings, "warn") def test_rollback(self, mock_warn): from google.cloud.spanner_dbapi import Connection - from google.cloud.spanner_dbapi.connection import AUTOCOMMIT_MODE_WARNING + from google.cloud.spanner_dbapi.connection import ( + CLIENT_TRANSACTION_NOT_STARTED_WARNING, + ) connection = Connection(INSTANCE, DATABASE) @@ -333,6 +344,7 @@ def test_rollback(self, mock_warn): mock_release.assert_not_called() mock_transaction = mock.MagicMock() + mock_transaction.committed = mock_transaction.rolled_back = False connection._transaction = mock_transaction mock_rollback = mock.MagicMock() mock_transaction.rollback = mock_rollback @@ -348,7 +360,7 @@ def test_rollback(self, mock_warn): connection._autocommit = True connection.rollback() mock_warn.assert_called_once_with( - AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2 + CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 ) @mock.patch("google.cloud.spanner_v1.database.Database", autospec=True) @@ -385,6 +397,35 @@ def test_as_context_manager(self): self.assertTrue(connection.is_closed) + def test_begin_cursor_closed(self): + self._under_test.close() + + with self.assertRaises(InterfaceError): + self._under_test.begin() + + self.assertEqual(self._under_test._transaction_begin_marked, False) + + def test_begin_transaction_begin_marked(self): + self._under_test._transaction_begin_marked = True + + with self.assertRaises(OperationalError): + self._under_test.begin() + + def test_begin_transaction_started(self): + mock_transaction = mock.MagicMock() + mock_transaction.committed = mock_transaction.rolled_back = False + self._under_test._transaction = mock_transaction + + with self.assertRaises(OperationalError): + self._under_test.begin() + + self.assertEqual(self._under_test._transaction_begin_marked, False) + + def test_begin(self): + self._under_test.begin() + + self.assertEqual(self._under_test._transaction_begin_marked, True) + def test_run_statement_wo_retried(self): """Check that Connection remembers executed statements.""" from google.cloud.spanner_dbapi.checksum import ResultsChecksum @@ -485,7 +526,8 @@ def test_rollback_clears_statements(self, mock_transaction): cleared, when the transaction is roll backed. """ connection = self._make_connection() - connection._transaction = mock.Mock() + mock_transaction.committed = mock_transaction.rolled_back = False + connection._transaction = mock_transaction connection._statements = [{}, {}] self.assertEqual(len(connection._statements), 2) diff --git a/tests/unit/spanner_dbapi/test_parse_utils.py b/tests/unit/spanner_dbapi/test_parse_utils.py index 162535349f..06819c3a3d 100644 --- a/tests/unit/spanner_dbapi/test_parse_utils.py +++ b/tests/unit/spanner_dbapi/test_parse_utils.py @@ -53,6 +53,12 @@ def test_classify_stmt(self): ("CREATE ROLE parent", StatementType.DDL), ("commit", StatementType.CLIENT_SIDE), (" commit TRANSACTION ", StatementType.CLIENT_SIDE), + ("begin", StatementType.CLIENT_SIDE), + ("start", StatementType.CLIENT_SIDE), + ("begin transaction", StatementType.CLIENT_SIDE), + ("start transaction", StatementType.CLIENT_SIDE), + ("rollback", StatementType.CLIENT_SIDE), + (" rollback TRANSACTION ", StatementType.CLIENT_SIDE), ("GRANT SELECT ON TABLE Singers TO ROLE parent", StatementType.DDL), ("REVOKE SELECT ON TABLE Singers TO ROLE parent", StatementType.DDL), ("GRANT ROLE parent TO ROLE child", StatementType.DDL),