diff --git a/spanner/google/cloud/spanner/database.py b/spanner/google/cloud/spanner/database.py index a449f304bf79..8df06812949d 100644 --- a/spanner/google/cloud/spanner/database.py +++ b/spanner/google/cloud/spanner/database.py @@ -380,8 +380,7 @@ def batch(self): """ return BatchCheckout(self) - def snapshot(self, read_timestamp=None, min_read_timestamp=None, - max_staleness=None, exact_staleness=None): + def snapshot(self, **kw): """Return an object which wraps a snapshot. The wrapper *must* be used as a context manager, with the snapshot @@ -390,38 +389,15 @@ def snapshot(self, read_timestamp=None, min_read_timestamp=None, See https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly - If no options are passed, reads will use the ``strong`` model, reading - at a timestamp where all previously committed transactions are visible. - - :type read_timestamp: :class:`datetime.datetime` - :param read_timestamp: Execute all reads at the given timestamp. - - :type min_read_timestamp: :class:`datetime.datetime` - :param min_read_timestamp: Execute all reads at a - timestamp >= ``min_read_timestamp``. - - :type max_staleness: :class:`datetime.timedelta` - :param max_staleness: Read data at a - timestamp >= NOW - ``max_staleness`` seconds. - - :type exact_staleness: :class:`datetime.timedelta` - :param exact_staleness: Execute all reads at a timestamp that is - ``exact_staleness`` old. - - :rtype: :class:`~google.cloud.spanner.snapshot.Snapshot` - :returns: a snapshot bound to this session - :raises: :exc:`ValueError` if the session has not yet been created. + :type kw: dict + :param kw: + Passed through to + :class:`~google.cloud.spanner.snapshot.Snapshot` constructor. :rtype: :class:`~google.cloud.spanner.database.SnapshotCheckout` :returns: new wrapper """ - return SnapshotCheckout( - self, - read_timestamp=read_timestamp, - min_read_timestamp=min_read_timestamp, - max_staleness=max_staleness, - exact_staleness=exact_staleness, - ) + return SnapshotCheckout(self, **kw) class BatchCheckout(object): @@ -467,40 +443,20 @@ class SnapshotCheckout(object): :type database: :class:`~google.cloud.spannder.database.Database` :param database: database to use - :type read_timestamp: :class:`datetime.datetime` - :param read_timestamp: Execute all reads at the given timestamp. - - :type min_read_timestamp: :class:`datetime.datetime` - :param min_read_timestamp: Execute all reads at a - timestamp >= ``min_read_timestamp``. - - :type max_staleness: :class:`datetime.timedelta` - :param max_staleness: Read data at a - timestamp >= NOW - ``max_staleness`` seconds. - - :type exact_staleness: :class:`datetime.timedelta` - :param exact_staleness: Execute all reads at a timestamp that is - ``exact_staleness`` old. + :type kw: dict + :param kw: + Passed through to + :class:`~google.cloud.spanner.snapshot.Snapshot` constructor. """ - def __init__(self, database, read_timestamp=None, min_read_timestamp=None, - max_staleness=None, exact_staleness=None): + def __init__(self, database, **kw): self._database = database self._session = None - self._read_timestamp = read_timestamp - self._min_read_timestamp = min_read_timestamp - self._max_staleness = max_staleness - self._exact_staleness = exact_staleness + self._kw = kw def __enter__(self): """Begin ``with`` block.""" session = self._session = self._database._pool.get() - return Snapshot( - session, - read_timestamp=self._read_timestamp, - min_read_timestamp=self._min_read_timestamp, - max_staleness=self._max_staleness, - exact_staleness=self._exact_staleness, - ) + return Snapshot(session, **self._kw) def __exit__(self, exc_type, exc_val, exc_tb): """End ``with`` block.""" diff --git a/spanner/google/cloud/spanner/session.py b/spanner/google/cloud/spanner/session.py index 45baffa92d43..36e550061526 100644 --- a/spanner/google/cloud/spanner/session.py +++ b/spanner/google/cloud/spanner/session.py @@ -139,30 +139,15 @@ def delete(self): raise NotFound(self.name) raise - def snapshot(self, read_timestamp=None, min_read_timestamp=None, - max_staleness=None, exact_staleness=None): + def snapshot(self, **kw): """Create a snapshot to perform a set of reads with shared staleness. See https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly - If no options are passed, reads will use the ``strong`` model, reading - at a timestamp where all previously committed transactions are visible. - - :type read_timestamp: :class:`datetime.datetime` - :param read_timestamp: Execute all reads at the given timestamp. - - :type min_read_timestamp: :class:`datetime.datetime` - :param min_read_timestamp: Execute all reads at a - timestamp >= ``min_read_timestamp``. - - :type max_staleness: :class:`datetime.timedelta` - :param max_staleness: Read data at a - timestamp >= NOW - ``max_staleness`` seconds. - - :type exact_staleness: :class:`datetime.timedelta` - :param exact_staleness: Execute all reads at a timestamp that is - ``exact_staleness`` old. + :type kw: dict + :param kw: Passed through to + :class:`~google.cloud.spanner.snapshot.Snapshot` ctor. :rtype: :class:`~google.cloud.spanner.snapshot.Snapshot` :returns: a snapshot bound to this session @@ -171,11 +156,7 @@ def snapshot(self, read_timestamp=None, min_read_timestamp=None, if self._session_id is None: raise ValueError("Session has not been created.") - return Snapshot(self, - read_timestamp=read_timestamp, - min_read_timestamp=min_read_timestamp, - max_staleness=max_staleness, - exact_staleness=exact_staleness) + return Snapshot(self, **kw) def read(self, table, columns, keyset, index='', limit=0, resume_token=b''): @@ -292,7 +273,7 @@ def run_in_transaction(self, func, *args, **kw): txn = self.transaction() else: txn = self._transaction - if txn._id is None: + if txn._transaction_id is None: txn.begin() try: func(txn, *args, **kw) diff --git a/spanner/google/cloud/spanner/snapshot.py b/spanner/google/cloud/spanner/snapshot.py index 05fcba63f322..e0da23f3acd9 100644 --- a/spanner/google/cloud/spanner/snapshot.py +++ b/spanner/google/cloud/spanner/snapshot.py @@ -34,6 +34,10 @@ class _SnapshotBase(_SessionWrapper): :type session: :class:`~google.cloud.spanner.session.Session` :param session: the session used to perform the commit """ + _multi_use = False + _transaction_id = None + _read_request_count = 0 + def _make_txn_selector(self): # pylint: disable=redundant-returns-doc """Helper for :meth:`read` / :meth:`execute_sql`. @@ -70,7 +74,15 @@ def read(self, table, columns, keyset, index='', limit=0, :rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet` :returns: a result set instance which can be used to consume rows. + :raises: ValueError for reuse of single-use snapshots, or if a + transaction ID is pending for multiple-use snapshots. """ + if self._read_request_count > 0: + if not self._multi_use: + raise ValueError("Cannot re-use single-use snapshot.") + if self._transaction_id is None: + raise ValueError("Transaction ID pending.") + database = self._session._database api = database.spanner_api options = _options_with_prefix(database.name) @@ -81,7 +93,12 @@ def read(self, table, columns, keyset, index='', limit=0, transaction=transaction, index=index, limit=limit, resume_token=resume_token, options=options) - return StreamedResultSet(iterator) + self._read_request_count += 1 + + if self._multi_use: + return StreamedResultSet(iterator, source=self) + else: + return StreamedResultSet(iterator) def execute_sql(self, sql, params=None, param_types=None, query_mode=None, resume_token=b''): @@ -109,7 +126,15 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None, :rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet` :returns: a result set instance which can be used to consume rows. + :raises: ValueError for reuse of single-use snapshots, or if a + transaction ID is pending for multiple-use snapshots. """ + if self._read_request_count > 0: + if not self._multi_use: + raise ValueError("Cannot re-use single-use snapshot.") + if self._transaction_id is None: + raise ValueError("Transaction ID pending.") + if params is not None: if param_types is None: raise ValueError( @@ -128,7 +153,12 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None, transaction=transaction, params=params_pb, param_types=param_types, query_mode=query_mode, resume_token=resume_token, options=options) - return StreamedResultSet(iterator) + self._read_request_count += 1 + + if self._multi_use: + return StreamedResultSet(iterator, source=self) + else: + return StreamedResultSet(iterator) class Snapshot(_SnapshotBase): @@ -157,9 +187,16 @@ class Snapshot(_SnapshotBase): :type exact_staleness: :class:`datetime.timedelta` :param exact_staleness: Execute all reads at a timestamp that is ``exact_staleness`` old. + + :type multi_use: :class:`bool` + :param multi_use: If true, multipl :meth:`read` / :meth:`execute_sql` + calls can be performed with the snapshot in the + context of a read-only transaction, used to ensure + isolation / consistency. Incompatible with + ``max_staleness`` and ``min_read_timestamp``. """ def __init__(self, session, read_timestamp=None, min_read_timestamp=None, - max_staleness=None, exact_staleness=None): + max_staleness=None, exact_staleness=None, multi_use=False): super(Snapshot, self).__init__(session) opts = [ read_timestamp, min_read_timestamp, max_staleness, exact_staleness] @@ -168,14 +205,24 @@ def __init__(self, session, read_timestamp=None, min_read_timestamp=None, if len(flagged) > 1: raise ValueError("Supply zero or one options.") + if multi_use: + if min_read_timestamp is not None or max_staleness is not None: + raise ValueError( + "'multi_use' is incompatible with " + "'min_read_timestamp' / 'max_staleness'") + self._strong = len(flagged) == 0 self._read_timestamp = read_timestamp self._min_read_timestamp = min_read_timestamp self._max_staleness = max_staleness self._exact_staleness = exact_staleness + self._multi_use = multi_use def _make_txn_selector(self): """Helper for :meth:`read`.""" + if self._transaction_id is not None: + return TransactionSelector(id=self._transaction_id) + if self._read_timestamp: key = 'read_timestamp' value = _datetime_to_pb_timestamp(self._read_timestamp) @@ -194,4 +241,34 @@ def _make_txn_selector(self): options = TransactionOptions( read_only=TransactionOptions.ReadOnly(**{key: value})) - return TransactionSelector(single_use=options) + + if self._multi_use: + return TransactionSelector(begin=options) + else: + return TransactionSelector(single_use=options) + + def begin(self): + """Begin a transaction on the database. + + :rtype: bytes + :returns: the ID for the newly-begun transaction. + :raises: ValueError if the transaction is already begun, committed, + or rolled back. + """ + if not self._multi_use: + raise ValueError("Cannot call 'begin' single-use snapshots") + + if self._transaction_id is not None: + raise ValueError("Read-only transaction already begun") + + if self._read_request_count > 0: + raise ValueError("Read-only transaction already pending") + + database = self._session._database + api = database.spanner_api + options = _options_with_prefix(database.name) + txn_selector = self._make_txn_selector() + response = api.begin_transaction( + self._session.name, txn_selector.begin, options=options) + self._transaction_id = response.id + return self._transaction_id diff --git a/spanner/google/cloud/spanner/streamed.py b/spanner/google/cloud/spanner/streamed.py index 19333844b1c1..7aa0ca43156e 100644 --- a/spanner/google/cloud/spanner/streamed.py +++ b/spanner/google/cloud/spanner/streamed.py @@ -32,8 +32,11 @@ class StreamedResultSet(object): Iterator yielding :class:`google.cloud.proto.spanner.v1.result_set_pb2.PartialResultSet` instances. + + :type source: :class:`~google.cloud.spanner.snapshot.Snapshot` + :param source: Snapshot from which the result set was fetched. """ - def __init__(self, response_iterator): + def __init__(self, response_iterator, source=None): self._response_iterator = response_iterator self._rows = [] # Fully-processed rows self._counter = 0 # Counter for processed responses @@ -42,6 +45,7 @@ def __init__(self, response_iterator): self._resume_token = None # To resume from last received PRS self._current_row = [] # Accumulated values for incomplete row self._pending_chunk = None # Incomplete value + self._source = source # Source snapshot @property def rows(self): @@ -130,7 +134,11 @@ def consume_next(self): self._resume_token = response.resume_token if self._metadata is None: # first response - self._metadata = response.metadata + metadata = self._metadata = response.metadata + + source = self._source + if source is not None and source._transaction_id is None: + source._transaction_id = metadata.transaction.id if response.HasField('stats'): # last response self._stats = response.stats diff --git a/spanner/google/cloud/spanner/transaction.py b/spanner/google/cloud/spanner/transaction.py index af2140896830..8f977412bd66 100644 --- a/spanner/google/cloud/spanner/transaction.py +++ b/spanner/google/cloud/spanner/transaction.py @@ -27,11 +27,8 @@ class Transaction(_SnapshotBase, _BatchBase): """Implement read-write transaction semantics for a session.""" committed = None """Timestamp at which the transaction was successfully committed.""" - - def __init__(self, session): - super(Transaction, self).__init__(session) - self._id = None - self._rolled_back = False + _rolled_back = False + _multi_use = True def _check_state(self): """Helper for :meth:`commit` et al. @@ -39,7 +36,7 @@ def _check_state(self): :raises: :exc:`ValueError` if the object's state is invalid for making API requests. """ - if self._id is None: + if self._transaction_id is None: raise ValueError("Transaction is not begun") if self.committed is not None: @@ -56,7 +53,7 @@ def _make_txn_selector(self): :returns: a selector configured for read-write transaction semantics. """ self._check_state() - return TransactionSelector(id=self._id) + return TransactionSelector(id=self._transaction_id) def begin(self): """Begin a transaction on the database. @@ -66,7 +63,7 @@ def begin(self): :raises: ValueError if the transaction is already begun, committed, or rolled back. """ - if self._id is not None: + if self._transaction_id is not None: raise ValueError("Transaction already begun") if self.committed is not None: @@ -82,8 +79,8 @@ def begin(self): read_write=TransactionOptions.ReadWrite()) response = api.begin_transaction( self._session.name, txn_options, options=options) - self._id = response.id - return self._id + self._transaction_id = response.id + return self._transaction_id def rollback(self): """Roll back a transaction on the database.""" @@ -91,7 +88,7 @@ def rollback(self): database = self._session._database api = database.spanner_api options = _options_with_prefix(database.name) - api.rollback(self._session.name, self._id, options=options) + api.rollback(self._session.name, self._transaction_id, options=options) self._rolled_back = True def commit(self): @@ -111,7 +108,7 @@ def commit(self): options = _options_with_prefix(database.name) response = api.commit( self._session.name, self._mutations, - transaction_id=self._id, options=options) + transaction_id=self._transaction_id, options=options) self.committed = _pb_timestamp_to_datetime( response.commit_timestamp) return self.committed diff --git a/spanner/tests/system/test_system.py b/spanner/tests/system/test_system.py index e6d73f977e94..28ef66862313 100644 --- a/spanner/tests/system/test_system.py +++ b/spanner/tests/system/test_system.py @@ -18,6 +18,7 @@ import os import struct import threading +import time import unittest from google.cloud.proto.spanner.v1.type_pb2 import ARRAY @@ -686,6 +687,56 @@ def test_snapshot_read_w_various_staleness(self): rows = list(strong.read(self.TABLE, self.COLUMNS, self.ALL)) self._check_row_data(rows, all_data_rows) + def test_multiuse_snapshot_read_isolation_strong(self): + ROW_COUNT = 40 + session, committed = self._set_up_table(ROW_COUNT) + all_data_rows = list(self._row_data(ROW_COUNT)) + strong = session.snapshot(multi_use=True) + + before = list(strong.read(self.TABLE, self.COLUMNS, self.ALL)) + self._check_row_data(before, all_data_rows) + + with self._db.batch() as batch: + batch.delete(self.TABLE, self.ALL) + + after = list(strong.read(self.TABLE, self.COLUMNS, self.ALL)) + self._check_row_data(after, all_data_rows) + + def test_multiuse_snapshot_read_isolation_read_timestamp(self): + ROW_COUNT = 40 + session, committed = self._set_up_table(ROW_COUNT) + all_data_rows = list(self._row_data(ROW_COUNT)) + read_ts = session.snapshot(read_timestamp=committed, multi_use=True) + + before = list(read_ts.read(self.TABLE, self.COLUMNS, self.ALL)) + self._check_row_data(before, all_data_rows) + + with self._db.batch() as batch: + batch.delete(self.TABLE, self.ALL) + + after = list(read_ts.read(self.TABLE, self.COLUMNS, self.ALL)) + self._check_row_data(after, all_data_rows) + + def test_multiuse_snapshot_read_isolation_exact_staleness(self): + ROW_COUNT = 40 + + session, committed = self._set_up_table(ROW_COUNT) + all_data_rows = list(self._row_data(ROW_COUNT)) + + time.sleep(1) + delta = datetime.timedelta(microseconds=1000) + + exact = session.snapshot(exact_staleness=delta, multi_use=True) + + before = list(exact.read(self.TABLE, self.COLUMNS, self.ALL)) + self._check_row_data(before, all_data_rows) + + with self._db.batch() as batch: + batch.delete(self.TABLE, self.ALL) + + after = list(exact.read(self.TABLE, self.COLUMNS, self.ALL)) + self._check_row_data(after, all_data_rows) + def test_read_w_manual_consume(self): ROW_COUNT = 4000 session, committed = self._set_up_table(ROW_COUNT) @@ -777,7 +828,7 @@ def test_read_w_ranges(self): START = 1000 END = 2000 session, committed = self._set_up_table(ROW_COUNT) - snapshot = session.snapshot(read_timestamp=committed) + snapshot = session.snapshot(read_timestamp=committed, multi_use=True) all_data_rows = list(self._row_data(ROW_COUNT)) closed_closed = KeyRange(start_closed=[START], end_closed=[END]) @@ -835,6 +886,22 @@ def _check_sql_results(self, snapshot, sql, params, param_types, expected): sql, params=params, param_types=param_types)) self._check_row_data(rows, expected=expected) + def test_multiuse_snapshot_execute_sql_isolation_strong(self): + ROW_COUNT = 40 + SQL = 'SELECT * FROM {}'.format(self.TABLE) + session, committed = self._set_up_table(ROW_COUNT) + all_data_rows = list(self._row_data(ROW_COUNT)) + strong = session.snapshot(multi_use=True) + + before = list(strong.execute_sql(SQL)) + self._check_row_data(before, all_data_rows) + + with self._db.batch() as batch: + batch.delete(self.TABLE, self.ALL) + + after = list(strong.execute_sql(SQL)) + self._check_row_data(after, all_data_rows) + def test_execute_sql_returning_array_of_struct(self): SQL = ( "SELECT ARRAY(SELECT AS STRUCT C1, C2 " @@ -867,7 +934,8 @@ def test_execute_sql_w_query_param(self): self.ALL_TYPES_COLUMNS, self.ALL_TYPES_ROWDATA) - snapshot = session.snapshot(read_timestamp=batch.committed) + snapshot = session.snapshot( + read_timestamp=batch.committed, multi_use=True) # Cannot equality-test array values. See below for a test w/ # array of IDs. diff --git a/spanner/tests/unit/test_database.py b/spanner/tests/unit/test_database.py index 6216d8a348fd..aa1643ed7582 100644 --- a/spanner/tests/unit/test_database.py +++ b/spanner/tests/unit/test_database.py @@ -682,12 +682,9 @@ def test_snapshot_defaults(self): checkout = database.snapshot() self.assertIsInstance(checkout, SnapshotCheckout) self.assertIs(checkout._database, database) - self.assertIsNone(checkout._read_timestamp) - self.assertIsNone(checkout._min_read_timestamp) - self.assertIsNone(checkout._max_staleness) - self.assertIsNone(checkout._exact_staleness) + self.assertEqual(checkout._kw, {}) - def test_snapshot_w_read_timestamp(self): + def test_snapshot_w_read_timestamp_and_multi_use(self): import datetime from google.cloud._helpers import UTC from google.cloud.spanner.database import SnapshotCheckout @@ -700,78 +697,12 @@ def test_snapshot_w_read_timestamp(self): pool.put(session) database = self._make_one(self.DATABASE_ID, instance, pool=pool) - checkout = database.snapshot(read_timestamp=now) + checkout = database.snapshot(read_timestamp=now, multi_use=True) self.assertIsInstance(checkout, SnapshotCheckout) self.assertIs(checkout._database, database) - self.assertEqual(checkout._read_timestamp, now) - self.assertIsNone(checkout._min_read_timestamp) - self.assertIsNone(checkout._max_staleness) - self.assertIsNone(checkout._exact_staleness) - - def test_snapshot_w_min_read_timestamp(self): - import datetime - from google.cloud._helpers import UTC - from google.cloud.spanner.database import SnapshotCheckout - - now = datetime.datetime.utcnow().replace(tzinfo=UTC) - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - pool = _Pool() - session = _Session() - pool.put(session) - database = self._make_one(self.DATABASE_ID, instance, pool=pool) - - checkout = database.snapshot(min_read_timestamp=now) - - self.assertIsInstance(checkout, SnapshotCheckout) - self.assertIs(checkout._database, database) - self.assertIsNone(checkout._read_timestamp) - self.assertEqual(checkout._min_read_timestamp, now) - self.assertIsNone(checkout._max_staleness) - self.assertIsNone(checkout._exact_staleness) - - def test_snapshot_w_max_staleness(self): - import datetime - from google.cloud.spanner.database import SnapshotCheckout - - staleness = datetime.timedelta(seconds=1, microseconds=234567) - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - pool = _Pool() - session = _Session() - pool.put(session) - database = self._make_one(self.DATABASE_ID, instance, pool=pool) - - checkout = database.snapshot(max_staleness=staleness) - - self.assertIsInstance(checkout, SnapshotCheckout) - self.assertIs(checkout._database, database) - self.assertIsNone(checkout._read_timestamp) - self.assertIsNone(checkout._min_read_timestamp) - self.assertEqual(checkout._max_staleness, staleness) - self.assertIsNone(checkout._exact_staleness) - - def test_snapshot_w_exact_staleness(self): - import datetime - from google.cloud.spanner.database import SnapshotCheckout - - staleness = datetime.timedelta(seconds=1, microseconds=234567) - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - pool = _Pool() - session = _Session() - pool.put(session) - database = self._make_one(self.DATABASE_ID, instance, pool=pool) - - checkout = database.snapshot(exact_staleness=staleness) - - self.assertIsInstance(checkout, SnapshotCheckout) - self.assertIs(checkout._database, database) - self.assertIsNone(checkout._read_timestamp) - self.assertIsNone(checkout._min_read_timestamp) - self.assertIsNone(checkout._max_staleness) - self.assertEqual(checkout._exact_staleness, staleness) + self.assertEqual( + checkout._kw, {'read_timestamp': now, 'multi_use': True}) class TestBatchCheckout(_BaseTest): @@ -862,20 +793,18 @@ def test_ctor_defaults(self): checkout = self._make_one(database) self.assertIs(checkout._database, database) - self.assertIsNone(checkout._read_timestamp) - self.assertIsNone(checkout._min_read_timestamp) - self.assertIsNone(checkout._max_staleness) - self.assertIsNone(checkout._exact_staleness) + self.assertEqual(checkout._kw, {}) with checkout as snapshot: self.assertIsNone(pool._session) self.assertIsInstance(snapshot, Snapshot) self.assertIs(snapshot._session, session) self.assertTrue(snapshot._strong) + self.assertFalse(snapshot._multi_use) self.assertIs(pool._session, session) - def test_ctor_w_read_timestamp(self): + def test_ctor_w_read_timestamp_and_multi_use(self): import datetime from google.cloud._helpers import UTC from google.cloud.spanner.snapshot import Snapshot @@ -886,99 +815,17 @@ def test_ctor_w_read_timestamp(self): pool = database._pool = _Pool() pool.put(session) - checkout = self._make_one(database, read_timestamp=now) + checkout = self._make_one(database, read_timestamp=now, multi_use=True) self.assertIs(checkout._database, database) - self.assertEqual(checkout._read_timestamp, now) - self.assertIsNone(checkout._min_read_timestamp) - self.assertIsNone(checkout._max_staleness) - self.assertIsNone(checkout._exact_staleness) + self.assertEqual(checkout._kw, + {'read_timestamp': now, 'multi_use': True}) with checkout as snapshot: self.assertIsNone(pool._session) self.assertIsInstance(snapshot, Snapshot) self.assertIs(snapshot._session, session) - self.assertFalse(snapshot._strong) self.assertEqual(snapshot._read_timestamp, now) - - self.assertIs(pool._session, session) - - def test_ctor_w_min_read_timestamp(self): - import datetime - from google.cloud._helpers import UTC - from google.cloud.spanner.snapshot import Snapshot - - now = datetime.datetime.utcnow().replace(tzinfo=UTC) - database = _Database(self.DATABASE_NAME) - session = _Session(database) - pool = database._pool = _Pool() - pool.put(session) - - checkout = self._make_one(database, min_read_timestamp=now) - self.assertIs(checkout._database, database) - self.assertIsNone(checkout._read_timestamp) - self.assertEqual(checkout._min_read_timestamp, now) - self.assertIsNone(checkout._max_staleness) - self.assertIsNone(checkout._exact_staleness) - - with checkout as snapshot: - self.assertIsNone(pool._session) - self.assertIsInstance(snapshot, Snapshot) - self.assertIs(snapshot._session, session) - self.assertFalse(snapshot._strong) - self.assertEqual(snapshot._min_read_timestamp, now) - - self.assertIs(pool._session, session) - - def test_ctor_w_max_staleness(self): - import datetime - from google.cloud.spanner.snapshot import Snapshot - - staleness = datetime.timedelta(seconds=1, microseconds=234567) - database = _Database(self.DATABASE_NAME) - session = _Session(database) - pool = database._pool = _Pool() - pool.put(session) - - checkout = self._make_one(database, max_staleness=staleness) - self.assertIs(checkout._database, database) - self.assertIsNone(checkout._read_timestamp) - self.assertIsNone(checkout._min_read_timestamp) - self.assertEqual(checkout._max_staleness, staleness) - self.assertIsNone(checkout._exact_staleness) - - with checkout as snapshot: - self.assertIsNone(pool._session) - self.assertIsInstance(snapshot, Snapshot) - self.assertIs(snapshot._session, session) - self.assertFalse(snapshot._strong) - self.assertEqual(snapshot._max_staleness, staleness) - - self.assertIs(pool._session, session) - - def test_ctor_w_exact_staleness(self): - import datetime - from google.cloud.spanner.snapshot import Snapshot - - staleness = datetime.timedelta(seconds=1, microseconds=234567) - database = _Database(self.DATABASE_NAME) - session = _Session(database) - pool = database._pool = _Pool() - pool.put(session) - - checkout = self._make_one(database, exact_staleness=staleness) - - self.assertIs(checkout._database, database) - self.assertIsNone(checkout._read_timestamp) - self.assertIsNone(checkout._min_read_timestamp) - self.assertIsNone(checkout._max_staleness) - self.assertEqual(checkout._exact_staleness, staleness) - - with checkout as snapshot: - self.assertIsNone(pool._session) - self.assertIsInstance(snapshot, Snapshot) - self.assertIs(snapshot._session, session) - self.assertFalse(snapshot._strong) - self.assertEqual(snapshot._exact_staleness, staleness) + self.assertTrue(snapshot._multi_use) self.assertIs(pool._session, session) diff --git a/spanner/tests/unit/test_session.py b/spanner/tests/unit/test_session.py index ce9f81eccc7a..100555c8e49f 100644 --- a/spanner/tests/unit/test_session.py +++ b/spanner/tests/unit/test_session.py @@ -225,6 +225,21 @@ def test_snapshot_created(self): self.assertIsInstance(snapshot, Snapshot) self.assertIs(snapshot._session, session) self.assertTrue(snapshot._strong) + self.assertFalse(snapshot._multi_use) + + def test_snapshot_created_w_multi_use(self): + from google.cloud.spanner.snapshot import Snapshot + + database = _Database(self.DATABASE_NAME) + session = self._make_one(database) + session._session_id = 'DEADBEEF' # emulate 'session.create()' + + snapshot = session.snapshot(multi_use=True) + + self.assertIsInstance(snapshot, Snapshot) + self.assertTrue(snapshot._session is session) + self.assertTrue(snapshot._strong) + self.assertTrue(snapshot._multi_use) def test_read_not_created(self): from google.cloud.spanner.keyset import KeySet @@ -403,7 +418,7 @@ def test_retry_transaction_w_commit_error_txn_already_begun(self): session = self._make_one(database) session._session_id = 'DEADBEEF' begun_txn = session._transaction = Transaction(session) - begun_txn._id = b'FACEDACE' + begun_txn._transaction_id = b'FACEDACE' called_with = [] diff --git a/spanner/tests/unit/test_snapshot.py b/spanner/tests/unit/test_snapshot.py index c5213dbd6cda..4717a14c2f24 100644 --- a/spanner/tests/unit/test_snapshot.py +++ b/spanner/tests/unit/test_snapshot.py @@ -53,12 +53,19 @@ def _makeDerived(self, session): class _Derived(self._getTargetClass()): + _transaction_id = None + _multi_use = False + def _make_txn_selector(self): from google.cloud.proto.spanner.v1.transaction_pb2 import ( TransactionOptions, TransactionSelector) + if self._transaction_id: + return TransactionSelector(id=self._transaction_id) options = TransactionOptions( read_only=TransactionOptions.ReadOnly(strong=True)) + if self._multi_use: + return TransactionSelector(begin=options) return TransactionSelector(single_use=options) return _Derived(session) @@ -105,7 +112,7 @@ def test_read_grpc_error(self): self.assertEqual(options.kwargs['metadata'], [('google-cloud-resource-prefix', database.name)]) - def test_read_normal(self): + def _read_helper(self, multi_use, first=True, count=0): from google.protobuf.struct_pb2 import Struct from google.cloud.proto.spanner.v1.result_set_pb2 import ( PartialResultSet, ResultSetMetadata, ResultSetStats) @@ -116,6 +123,7 @@ def test_read_normal(self): from google.cloud.spanner.keyset import KeySet from google.cloud.spanner._helpers import _make_value_pb + TXN_ID = b'DEADBEEF' VALUES = [ [u'bharney', 31], [u'phred', 32], @@ -147,11 +155,22 @@ def test_read_normal(self): _streaming_read_response=_MockCancellableIterator(*result_sets)) session = _Session(database) derived = self._makeDerived(session) + derived._multi_use = multi_use + derived._read_request_count = count + if not first: + derived._transaction_id = TXN_ID result_set = derived.read( TABLE_NAME, COLUMNS, KEYSET, index=INDEX, limit=LIMIT, resume_token=TOKEN) + self.assertEqual(derived._read_request_count, count + 1) + + if multi_use: + self.assertIs(result_set._source, derived) + else: + self.assertIsNone(result_set._source) + result_set.consume_all() self.assertEqual(list(result_set.rows), VALUES) self.assertEqual(result_set.metadata, metadata_pb) @@ -165,13 +184,39 @@ def test_read_normal(self): self.assertEqual(columns, COLUMNS) self.assertEqual(key_set, KEYSET.to_pb()) self.assertIsInstance(transaction, TransactionSelector) - self.assertTrue(transaction.single_use.read_only.strong) + if multi_use: + if first: + self.assertTrue(transaction.begin.read_only.strong) + else: + self.assertEqual(transaction.id, TXN_ID) + else: + self.assertTrue(transaction.single_use.read_only.strong) self.assertEqual(index, INDEX) self.assertEqual(limit, LIMIT) self.assertEqual(resume_token, TOKEN) self.assertEqual(options.kwargs['metadata'], [('google-cloud-resource-prefix', database.name)]) + def test_read_wo_multi_use(self): + self._read_helper(multi_use=False) + + def test_read_wo_multi_use_w_read_request_count_gt_0(self): + with self.assertRaises(ValueError): + self._read_helper(multi_use=False, count=1) + + def test_read_w_multi_use_wo_first(self): + self._read_helper(multi_use=True, first=False) + + def test_read_w_multi_use_wo_first_w_count_gt_0(self): + self._read_helper(multi_use=True, first=False, count=1) + + def test_read_w_multi_use_w_first(self): + self._read_helper(multi_use=True, first=True) + + def test_read_w_multi_use_w_first_w_count_gt_0(self): + with self.assertRaises(ValueError): + self._read_helper(multi_use=True, first=True, count=1) + def test_execute_sql_grpc_error(self): from google.cloud.proto.spanner.v1.transaction_pb2 import ( TransactionSelector) @@ -208,7 +253,7 @@ def test_execute_sql_w_params_wo_param_types(self): with self.assertRaises(ValueError): derived.execute_sql(SQL_QUERY_WITH_PARAM, PARAMS) - def test_execute_sql_normal(self): + def _execute_sql_helper(self, multi_use, first=True, count=0): from google.protobuf.struct_pb2 import Struct from google.cloud.proto.spanner.v1.result_set_pb2 import ( PartialResultSet, ResultSetMetadata, ResultSetStats) @@ -218,6 +263,7 @@ def test_execute_sql_normal(self): from google.cloud.proto.spanner.v1.type_pb2 import STRING, INT64 from google.cloud.spanner._helpers import _make_value_pb + TXN_ID = b'DEADBEEF' VALUES = [ [u'bharney', u'rhubbyl', 31], [u'phred', u'phlyntstone', 32], @@ -248,11 +294,22 @@ def test_execute_sql_normal(self): _execute_streaming_sql_response=iterator) session = _Session(database) derived = self._makeDerived(session) + derived._multi_use = multi_use + derived._read_request_count = count + if not first: + derived._transaction_id = TXN_ID result_set = derived.execute_sql( SQL_QUERY_WITH_PARAM, PARAMS, PARAM_TYPES, query_mode=MODE, resume_token=TOKEN) + self.assertEqual(derived._read_request_count, count + 1) + + if multi_use: + self.assertIs(result_set._source, derived) + else: + self.assertIsNone(result_set._source) + result_set.consume_all() self.assertEqual(list(result_set.rows), VALUES) self.assertEqual(result_set.metadata, metadata_pb) @@ -264,7 +321,13 @@ def test_execute_sql_normal(self): self.assertEqual(r_session, self.SESSION_NAME) self.assertEqual(sql, SQL_QUERY_WITH_PARAM) self.assertIsInstance(transaction, TransactionSelector) - self.assertTrue(transaction.single_use.read_only.strong) + if multi_use: + if first: + self.assertTrue(transaction.begin.read_only.strong) + else: + self.assertEqual(transaction.id, TXN_ID) + else: + self.assertTrue(transaction.single_use.read_only.strong) expected_params = Struct(fields={ key: _make_value_pb(value) for (key, value) in PARAMS.items()}) self.assertEqual(params, expected_params) @@ -274,6 +337,26 @@ def test_execute_sql_normal(self): self.assertEqual(options.kwargs['metadata'], [('google-cloud-resource-prefix', database.name)]) + def test_execute_sql_wo_multi_use(self): + self._execute_sql_helper(multi_use=False) + + def test_execute_sql_wo_multi_use_w_read_request_count_gt_0(self): + with self.assertRaises(ValueError): + self._execute_sql_helper(multi_use=False, count=1) + + def test_execute_sql_w_multi_use_wo_first(self): + self._execute_sql_helper(multi_use=True, first=False) + + def test_execute_sql_w_multi_use_wo_first_w_count_gt_0(self): + self._execute_sql_helper(multi_use=True, first=False, count=1) + + def test_execute_sql_w_multi_use_w_first(self): + self._execute_sql_helper(multi_use=True, first=True) + + def test_execute_sql_w_multi_use_w_first_w_count_gt_0(self): + with self.assertRaises(ValueError): + self._execute_sql_helper(multi_use=True, first=True, count=1) + class _MockCancellableIterator(object): @@ -298,6 +381,7 @@ class TestSnapshot(unittest.TestCase): DATABASE_NAME = INSTANCE_NAME + '/databases/' + DATABASE_ID SESSION_ID = 'session-id' SESSION_NAME = DATABASE_NAME + '/sessions/' + SESSION_ID + TRANSACTION_ID = b'DEADBEEF' def _getTargetClass(self): from google.cloud.spanner.snapshot import Snapshot @@ -326,6 +410,7 @@ def test_ctor_defaults(self): self.assertIsNone(snapshot._min_read_timestamp) self.assertIsNone(snapshot._max_staleness) self.assertIsNone(snapshot._exact_staleness) + self.assertFalse(snapshot._multi_use) def test_ctor_w_multiple_options(self): timestamp = self._makeTimestamp() @@ -346,6 +431,7 @@ def test_ctor_w_read_timestamp(self): self.assertIsNone(snapshot._min_read_timestamp) self.assertIsNone(snapshot._max_staleness) self.assertIsNone(snapshot._exact_staleness) + self.assertFalse(snapshot._multi_use) def test_ctor_w_min_read_timestamp(self): timestamp = self._makeTimestamp() @@ -357,6 +443,7 @@ def test_ctor_w_min_read_timestamp(self): self.assertEqual(snapshot._min_read_timestamp, timestamp) self.assertIsNone(snapshot._max_staleness) self.assertIsNone(snapshot._exact_staleness) + self.assertFalse(snapshot._multi_use) def test_ctor_w_max_staleness(self): duration = self._makeDuration() @@ -368,6 +455,7 @@ def test_ctor_w_max_staleness(self): self.assertIsNone(snapshot._min_read_timestamp) self.assertEqual(snapshot._max_staleness, duration) self.assertIsNone(snapshot._exact_staleness) + self.assertFalse(snapshot._multi_use) def test_ctor_w_exact_staleness(self): duration = self._makeDuration() @@ -379,6 +467,66 @@ def test_ctor_w_exact_staleness(self): self.assertIsNone(snapshot._min_read_timestamp) self.assertIsNone(snapshot._max_staleness) self.assertEqual(snapshot._exact_staleness, duration) + self.assertFalse(snapshot._multi_use) + + def test_ctor_w_multi_use(self): + session = _Session() + snapshot = self._make_one(session, multi_use=True) + self.assertTrue(snapshot._session is session) + self.assertTrue(snapshot._strong) + self.assertIsNone(snapshot._read_timestamp) + self.assertIsNone(snapshot._min_read_timestamp) + self.assertIsNone(snapshot._max_staleness) + self.assertIsNone(snapshot._exact_staleness) + self.assertTrue(snapshot._multi_use) + + def test_ctor_w_multi_use_and_read_timestamp(self): + timestamp = self._makeTimestamp() + session = _Session() + snapshot = self._make_one( + session, read_timestamp=timestamp, multi_use=True) + self.assertTrue(snapshot._session is session) + self.assertFalse(snapshot._strong) + self.assertEqual(snapshot._read_timestamp, timestamp) + self.assertIsNone(snapshot._min_read_timestamp) + self.assertIsNone(snapshot._max_staleness) + self.assertIsNone(snapshot._exact_staleness) + self.assertTrue(snapshot._multi_use) + + def test_ctor_w_multi_use_and_min_read_timestamp(self): + timestamp = self._makeTimestamp() + session = _Session() + + with self.assertRaises(ValueError): + self._make_one( + session, min_read_timestamp=timestamp, multi_use=True) + + def test_ctor_w_multi_use_and_max_staleness(self): + duration = self._makeDuration() + session = _Session() + + with self.assertRaises(ValueError): + self._make_one(session, max_staleness=duration, multi_use=True) + + def test_ctor_w_multi_use_and_exact_staleness(self): + duration = self._makeDuration() + session = _Session() + snapshot = self._make_one( + session, exact_staleness=duration, multi_use=True) + self.assertTrue(snapshot._session is session) + self.assertFalse(snapshot._strong) + self.assertIsNone(snapshot._read_timestamp) + self.assertIsNone(snapshot._min_read_timestamp) + self.assertIsNone(snapshot._max_staleness) + self.assertEqual(snapshot._exact_staleness, duration) + self.assertTrue(snapshot._multi_use) + + def test__make_txn_selector_w_transaction_id(self): + session = _Session() + snapshot = self._make_one(session) + snapshot._transaction_id = self.TRANSACTION_ID + selector = snapshot._make_txn_selector() + self.assertEqual(selector.id, self.TRANSACTION_ID) def test__make_txn_selector_strong(self): session = _Session() @@ -429,6 +577,127 @@ def test__make_txn_selector_w_exact_staleness(self): self.assertEqual(options.read_only.exact_staleness.seconds, 3) self.assertEqual(options.read_only.exact_staleness.nanos, 123456000) + def test__make_txn_selector_strong_w_multi_use(self): + session = _Session() + snapshot = self._make_one(session, multi_use=True) + selector = snapshot._make_txn_selector() + options = selector.begin + self.assertTrue(options.read_only.strong) + + def test__make_txn_selector_w_read_timestamp_w_multi_use(self): + from google.cloud._helpers import _pb_timestamp_to_datetime + + timestamp = self._makeTimestamp() + session = _Session() + snapshot = self._make_one( + session, read_timestamp=timestamp, multi_use=True) + selector = snapshot._make_txn_selector() + options = selector.begin + self.assertEqual( + _pb_timestamp_to_datetime(options.read_only.read_timestamp), + timestamp) + + def test__make_txn_selector_w_exact_staleness_w_multi_use(self): + duration = self._makeDuration(seconds=3, microseconds=123456) + session = _Session() + snapshot = self._make_one( + session, exact_staleness=duration, multi_use=True) + selector = snapshot._make_txn_selector() + options = selector.begin + self.assertEqual(options.read_only.exact_staleness.seconds, 3) + self.assertEqual(options.read_only.exact_staleness.nanos, 123456000) + + def test_begin_wo_multi_use(self): + session = _Session() + snapshot = self._make_one(session) + with self.assertRaises(ValueError): + snapshot.begin() + + def test_begin_w_read_request_count_gt_0(self): + session = _Session() + snapshot = self._make_one(session, multi_use=True) + snapshot._read_request_count = 1 + with self.assertRaises(ValueError): + snapshot.begin() + + def test_begin_w_existing_txn_id(self): + session = _Session() + snapshot = self._make_one(session, multi_use=True) + snapshot._transaction_id = self.TRANSACTION_ID + with self.assertRaises(ValueError): + snapshot.begin() + + def test_begin_w_gax_error(self): + from google.gax.errors import GaxError + from google.cloud._helpers import _pb_timestamp_to_datetime + + database = _Database() + api = database.spanner_api = _FauxSpannerAPI( + _random_gax_error=True) + timestamp = self._makeTimestamp() + session = _Session(database) + snapshot = self._make_one( + session, read_timestamp=timestamp, multi_use=True) + + with self.assertRaises(GaxError): + snapshot.begin() + + session_id, txn_options, options = api._begun + self.assertEqual(session_id, session.name) + self.assertEqual( + _pb_timestamp_to_datetime(txn_options.read_only.read_timestamp), + timestamp) + self.assertEqual(options.kwargs['metadata'], + [('google-cloud-resource-prefix', database.name)]) + + def test_begin_ok_exact_staleness(self): + from google.cloud.proto.spanner.v1.transaction_pb2 import ( + Transaction as TransactionPB) + + transaction_pb = TransactionPB(id=self.TRANSACTION_ID) + database = _Database() + api = database.spanner_api = _FauxSpannerAPI( + _begin_transaction_response=transaction_pb) + duration = self._makeDuration(seconds=3, microseconds=123456) + session = _Session(database) + snapshot = self._make_one( + session, exact_staleness=duration, multi_use=True) + + txn_id = snapshot.begin() + + self.assertEqual(txn_id, self.TRANSACTION_ID) + self.assertEqual(snapshot._transaction_id, self.TRANSACTION_ID) + + session_id, txn_options, options = api._begun + self.assertEqual(session_id, session.name) + read_only = txn_options.read_only + self.assertEqual(read_only.exact_staleness.seconds, 3) + self.assertEqual(read_only.exact_staleness.nanos, 123456000) + self.assertEqual(options.kwargs['metadata'], + [('google-cloud-resource-prefix', database.name)]) + + def test_begin_ok_exact_strong(self): + from google.cloud.proto.spanner.v1.transaction_pb2 import ( + Transaction as TransactionPB) + + transaction_pb = TransactionPB(id=self.TRANSACTION_ID) + database = _Database() + api = database.spanner_api = _FauxSpannerAPI( + _begin_transaction_response=transaction_pb) + session = _Session(database) + snapshot = self._make_one(session, multi_use=True) + + txn_id = snapshot.begin() + + self.assertEqual(txn_id, self.TRANSACTION_ID) + self.assertEqual(snapshot._transaction_id, self.TRANSACTION_ID) + + session_id, txn_options, options = api._begun + self.assertEqual(session_id, session.name) + self.assertTrue(txn_options.read_only.strong) + self.assertEqual(options.kwargs['metadata'], + [('google-cloud-resource-prefix', database.name)]) + class _Session(object): @@ -443,7 +712,15 @@ class _Database(object): class _FauxSpannerAPI(_GAXBaseAPI): - _read_with = None + _read_with = _begin = None + + def begin_transaction(self, session, options_, options=None): + from google.gax.errors import GaxError + + self._begun = (session, options_, options) + if self._random_gax_error: + raise GaxError('error') + return self._begin_transaction_response # pylint: disable=too-many-arguments def streaming_read(self, session, table, columns, key_set, diff --git a/spanner/tests/unit/test_streamed.py b/spanner/tests/unit/test_streamed.py index edcace273f66..2e31f4dfad2c 100644 --- a/spanner/tests/unit/test_streamed.py +++ b/spanner/tests/unit/test_streamed.py @@ -15,6 +15,8 @@ import unittest +import mock + class TestStreamedResultSet(unittest.TestCase): @@ -30,6 +32,18 @@ def test_ctor_defaults(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) self.assertIs(streamed._response_iterator, iterator) + self.assertIsNone(streamed._source) + self.assertEqual(streamed.rows, []) + self.assertIsNone(streamed.metadata) + self.assertIsNone(streamed.stats) + self.assertIsNone(streamed.resume_token) + + def test_ctor_w_source(self): + iterator = _MockCancellableIterator() + source = object() + streamed = self._make_one(iterator, source=source) + self.assertIs(streamed._response_iterator, iterator) + self.assertIs(streamed._source, source) self.assertEqual(streamed.rows, []) self.assertIsNone(streamed.metadata) self.assertIsNone(streamed.stats) @@ -42,14 +56,14 @@ def test_fields_unset(self): _ = streamed.fields @staticmethod - def _makeScalarField(name, type_): + def _make_scalar_field(name, type_): from google.cloud.proto.spanner.v1.type_pb2 import StructType from google.cloud.proto.spanner.v1.type_pb2 import Type return StructType.Field(name=name, type=Type(code=type_)) @staticmethod - def _makeArrayField(name, element_type_code=None, element_type=None): + def _make_array_field(name, element_type_code=None, element_type=None): from google.cloud.proto.spanner.v1.type_pb2 import StructType from google.cloud.proto.spanner.v1.type_pb2 import Type @@ -60,7 +74,7 @@ def _makeArrayField(name, element_type_code=None, element_type=None): return StructType.Field(name=name, type=array_type) @staticmethod - def _makeStructType(struct_type_fields): + def _make_struct_type(struct_type_fields): from google.cloud.proto.spanner.v1.type_pb2 import StructType from google.cloud.proto.spanner.v1.type_pb2 import Type @@ -72,13 +86,13 @@ def _makeStructType(struct_type_fields): return Type(code='STRUCT', struct_type=struct_type) @staticmethod - def _makeValue(value): + def _make_value(value): from google.cloud.spanner._helpers import _make_value_pb return _make_value_pb(value) @staticmethod - def _makeListValue(values=(), value_pbs=None): + def _make_list_value(values=(), value_pbs=None): from google.protobuf.struct_pb2 import ListValue from google.protobuf.struct_pb2 import Value from google.cloud.spanner._helpers import _make_list_value_pb @@ -87,15 +101,52 @@ def _makeListValue(values=(), value_pbs=None): return Value(list_value=ListValue(values=value_pbs)) return Value(list_value=_make_list_value_pb(values)) + @staticmethod + def _make_result_set_metadata(fields=(), transaction_id=None): + from google.cloud.proto.spanner.v1.result_set_pb2 import ( + ResultSetMetadata) + metadata = ResultSetMetadata() + for field in fields: + metadata.row_type.fields.add().CopyFrom(field) + if transaction_id is not None: + metadata.transaction.id = transaction_id + return metadata + + @staticmethod + def _make_result_set_stats(query_plan=None, **kw): + from google.cloud.proto.spanner.v1.result_set_pb2 import ( + ResultSetStats) + from google.protobuf.struct_pb2 import Struct + from google.cloud.spanner._helpers import _make_value_pb + + query_stats = Struct(fields={ + key: _make_value_pb(value) for key, value in kw.items()}) + return ResultSetStats( + query_plan=query_plan, + query_stats=query_stats, + ) + + @staticmethod + def _make_partial_result_set( + values, metadata=None, stats=None, chunked_value=False): + from google.cloud.proto.spanner.v1.result_set_pb2 import ( + PartialResultSet) + return PartialResultSet( + values=values, + metadata=metadata, + stats=stats, + chunked_value=chunked_value, + ) + def test_properties_set(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), ] - metadata = streamed._metadata = _ResultSetMetadataPB(FIELDS) - stats = streamed._stats = _ResultSetStatsPB() + metadata = streamed._metadata = self._make_result_set_metadata(FIELDS) + stats = streamed._stats = self._make_result_set_stats() self.assertEqual(list(streamed.fields), FIELDS) self.assertIs(streamed.metadata, metadata) self.assertIs(streamed.stats, stats) @@ -106,11 +157,11 @@ def test__merge_chunk_bool(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('registered_voter', 'BOOL'), + self._make_scalar_field('registered_voter', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeValue(True) - chunk = self._makeValue(False) + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_value(True) + chunk = self._make_value(False) with self.assertRaises(Unmergeable): streamed._merge_chunk(chunk) @@ -119,11 +170,11 @@ def test__merge_chunk_int64(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('age', 'INT64'), + self._make_scalar_field('age', 'INT64'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeValue(42) - chunk = self._makeValue(13) + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_value(42) + chunk = self._make_value(13) merged = streamed._merge_chunk(chunk) self.assertEqual(merged.string_value, '4213') @@ -133,11 +184,11 @@ def test__merge_chunk_float64_nan_string(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('weight', 'FLOAT64'), + self._make_scalar_field('weight', 'FLOAT64'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeValue(u'Na') - chunk = self._makeValue(u'N') + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_value(u'Na') + chunk = self._make_value(u'N') merged = streamed._merge_chunk(chunk) self.assertEqual(merged.string_value, u'NaN') @@ -146,11 +197,11 @@ def test__merge_chunk_float64_w_empty(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('weight', 'FLOAT64'), + self._make_scalar_field('weight', 'FLOAT64'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeValue(3.14159) - chunk = self._makeValue('') + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_value(3.14159) + chunk = self._make_value('') merged = streamed._merge_chunk(chunk) self.assertEqual(merged.number_value, 3.14159) @@ -161,11 +212,11 @@ def test__merge_chunk_float64_w_float64(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('weight', 'FLOAT64'), + self._make_scalar_field('weight', 'FLOAT64'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeValue(3.14159) - chunk = self._makeValue(2.71828) + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_value(3.14159) + chunk = self._make_value(2.71828) with self.assertRaises(Unmergeable): streamed._merge_chunk(chunk) @@ -174,11 +225,11 @@ def test__merge_chunk_string(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('name', 'STRING'), + self._make_scalar_field('name', 'STRING'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeValue(u'phred') - chunk = self._makeValue(u'wylma') + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_value(u'phred') + chunk = self._make_value(u'wylma') merged = streamed._merge_chunk(chunk) @@ -189,11 +240,11 @@ def test__merge_chunk_string_w_bytes(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('image', 'BYTES'), + self._make_scalar_field('image', 'BYTES'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeValue(u'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA6fptVAAAACXBIWXMAAAsTAAALEwEAmpwYAAAA\n') - chunk = self._makeValue(u'B3RJTUUH4QQGFwsBTL3HMwAAABJpVFh0Q29tbWVudAAAAAAAU0FNUExFMG3E+AAAAApJREFUCNdj\nYAAAAAIAAeIhvDMAAAAASUVORK5CYII=\n') + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_value(u'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA6fptVAAAACXBIWXMAAAsTAAALEwEAmpwYAAAA\n') + chunk = self._make_value(u'B3RJTUUH4QQGFwsBTL3HMwAAABJpVFh0Q29tbWVudAAAAAAAU0FNUExFMG3E+AAAAApJREFUCNdj\nYAAAAAIAAeIhvDMAAAAASUVORK5CYII=\n') merged = streamed._merge_chunk(chunk) @@ -204,15 +255,15 @@ def test__merge_chunk_array_of_bool(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeArrayField('name', element_type_code='BOOL'), + self._make_array_field('name', element_type_code='BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeListValue([True, True]) - chunk = self._makeListValue([False, False, False]) + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_list_value([True, True]) + chunk = self._make_list_value([False, False, False]) merged = streamed._merge_chunk(chunk) - expected = self._makeListValue([True, True, False, False, False]) + expected = self._make_list_value([True, True, False, False, False]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -220,15 +271,15 @@ def test__merge_chunk_array_of_int(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeArrayField('name', element_type_code='INT64'), + self._make_array_field('name', element_type_code='INT64'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeListValue([0, 1, 2]) - chunk = self._makeListValue([3, 4, 5]) + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_list_value([0, 1, 2]) + chunk = self._make_list_value([3, 4, 5]) merged = streamed._merge_chunk(chunk) - expected = self._makeListValue([0, 1, 23, 4, 5]) + expected = self._make_list_value([0, 1, 23, 4, 5]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -242,15 +293,15 @@ def test__merge_chunk_array_of_float(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeArrayField('name', element_type_code='FLOAT64'), + self._make_array_field('name', element_type_code='FLOAT64'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeListValue([PI, SQRT_2]) - chunk = self._makeListValue(['', EULER, LOG_10]) + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_list_value([PI, SQRT_2]) + chunk = self._make_list_value(['', EULER, LOG_10]) merged = streamed._merge_chunk(chunk) - expected = self._makeListValue([PI, SQRT_2, EULER, LOG_10]) + expected = self._make_list_value([PI, SQRT_2, EULER, LOG_10]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -258,15 +309,15 @@ def test__merge_chunk_array_of_string(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeArrayField('name', element_type_code='STRING'), + self._make_array_field('name', element_type_code='STRING'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeListValue([u'A', u'B', u'C']) - chunk = self._makeListValue([None, u'D', u'E']) + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_list_value([u'A', u'B', u'C']) + chunk = self._make_list_value([None, u'D', u'E']) merged = streamed._merge_chunk(chunk) - expected = self._makeListValue([u'A', u'B', u'C', None, u'D', u'E']) + expected = self._make_list_value([u'A', u'B', u'C', None, u'D', u'E']) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -274,15 +325,15 @@ def test__merge_chunk_array_of_string_with_null(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeArrayField('name', element_type_code='STRING'), + self._make_array_field('name', element_type_code='STRING'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeListValue([u'A', u'B', u'C']) - chunk = self._makeListValue([u'D', u'E']) + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_list_value([u'A', u'B', u'C']) + chunk = self._make_list_value([u'D', u'E']) merged = streamed._merge_chunk(chunk) - expected = self._makeListValue([u'A', u'B', u'CD', u'E']) + expected = self._make_list_value([u'A', u'B', u'CD', u'E']) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -298,22 +349,22 @@ def test__merge_chunk_array_of_array_of_int(self): FIELDS = [ StructType.Field(name='loloi', type=array_type) ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeListValue(value_pbs=[ - self._makeListValue([0, 1]), - self._makeListValue([2]), + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_list_value(value_pbs=[ + self._make_list_value([0, 1]), + self._make_list_value([2]), ]) - chunk = self._makeListValue(value_pbs=[ - self._makeListValue([3]), - self._makeListValue([4, 5]), + chunk = self._make_list_value(value_pbs=[ + self._make_list_value([3]), + self._make_list_value([4, 5]), ]) merged = streamed._merge_chunk(chunk) - expected = self._makeListValue(value_pbs=[ - self._makeListValue([0, 1]), - self._makeListValue([23]), - self._makeListValue([4, 5]), + expected = self._make_list_value(value_pbs=[ + self._make_list_value([0, 1]), + self._make_list_value([23]), + self._make_list_value([4, 5]), ]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -330,22 +381,22 @@ def test__merge_chunk_array_of_array_of_string(self): FIELDS = [ StructType.Field(name='lolos', type=array_type) ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeListValue(value_pbs=[ - self._makeListValue([u'A', u'B']), - self._makeListValue([u'C']), + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_list_value(value_pbs=[ + self._make_list_value([u'A', u'B']), + self._make_list_value([u'C']), ]) - chunk = self._makeListValue(value_pbs=[ - self._makeListValue([u'D']), - self._makeListValue([u'E', u'F']), + chunk = self._make_list_value(value_pbs=[ + self._make_list_value([u'D']), + self._make_list_value([u'E', u'F']), ]) merged = streamed._merge_chunk(chunk) - expected = self._makeListValue(value_pbs=[ - self._makeListValue([u'A', u'B']), - self._makeListValue([u'CD']), - self._makeListValue([u'E', u'F']), + expected = self._make_list_value(value_pbs=[ + self._make_list_value([u'A', u'B']), + self._make_list_value([u'CD']), + self._make_list_value([u'E', u'F']), ]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -353,47 +404,47 @@ def test__merge_chunk_array_of_array_of_string(self): def test__merge_chunk_array_of_struct(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) - struct_type = self._makeStructType([ + struct_type = self._make_struct_type([ ('name', 'STRING'), ('age', 'INT64'), ]) FIELDS = [ - self._makeArrayField('test', element_type=struct_type), + self._make_array_field('test', element_type=struct_type), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - partial = self._makeListValue([u'Phred ']) - streamed._pending_chunk = self._makeListValue(value_pbs=[partial]) - rest = self._makeListValue([u'Phlyntstone', 31]) - chunk = self._makeListValue(value_pbs=[rest]) + streamed._metadata = self._make_result_set_metadata(FIELDS) + partial = self._make_list_value([u'Phred ']) + streamed._pending_chunk = self._make_list_value(value_pbs=[partial]) + rest = self._make_list_value([u'Phlyntstone', 31]) + chunk = self._make_list_value(value_pbs=[rest]) merged = streamed._merge_chunk(chunk) - struct = self._makeListValue([u'Phred Phlyntstone', 31]) - expected = self._makeListValue(value_pbs=[struct]) + struct = self._make_list_value([u'Phred Phlyntstone', 31]) + expected = self._make_list_value(value_pbs=[struct]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) def test__merge_chunk_array_of_struct_unmergeable(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) - struct_type = self._makeStructType([ + struct_type = self._make_struct_type([ ('name', 'STRING'), ('registered', 'BOOL'), ('voted', 'BOOL'), ]) FIELDS = [ - self._makeArrayField('test', element_type=struct_type), + self._make_array_field('test', element_type=struct_type), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) - partial = self._makeListValue([u'Phred Phlyntstone', True]) - streamed._pending_chunk = self._makeListValue(value_pbs=[partial]) - rest = self._makeListValue([True]) - chunk = self._makeListValue(value_pbs=[rest]) + streamed._metadata = self._make_result_set_metadata(FIELDS) + partial = self._make_list_value([u'Phred Phlyntstone', True]) + streamed._pending_chunk = self._make_list_value(value_pbs=[partial]) + rest = self._make_list_value([True]) + chunk = self._make_list_value(value_pbs=[rest]) merged = streamed._merge_chunk(chunk) - struct = self._makeListValue([u'Phred Phlyntstone', True, True]) - expected = self._makeListValue(value_pbs=[struct]) + struct = self._make_list_value([u'Phred Phlyntstone', True, True]) + expected = self._make_list_value(value_pbs=[struct]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -401,11 +452,11 @@ def test_merge_values_empty_and_empty(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) streamed._current_row = [] streamed._merge_values([]) self.assertEqual(streamed.rows, []) @@ -415,13 +466,13 @@ def test_merge_values_empty_and_partial(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) BARE = [u'Phred Phlyntstone', 42] - VALUES = [self._makeValue(bare) for bare in BARE] + VALUES = [self._make_value(bare) for bare in BARE] streamed._current_row = [] streamed._merge_values(VALUES) self.assertEqual(streamed.rows, []) @@ -431,13 +482,13 @@ def test_merge_values_empty_and_filled(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) BARE = [u'Phred Phlyntstone', 42, True] - VALUES = [self._makeValue(bare) for bare in BARE] + VALUES = [self._make_value(bare) for bare in BARE] streamed._current_row = [] streamed._merge_values(VALUES) self.assertEqual(streamed.rows, [BARE]) @@ -447,17 +498,17 @@ def test_merge_values_empty_and_filled_plus(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) BARE = [ u'Phred Phlyntstone', 42, True, u'Bharney Rhubble', 39, True, u'Wylma Phlyntstone', ] - VALUES = [self._makeValue(bare) for bare in BARE] + VALUES = [self._make_value(bare) for bare in BARE] streamed._current_row = [] streamed._merge_values(VALUES) self.assertEqual(streamed.rows, [BARE[0:3], BARE[3:6]]) @@ -467,11 +518,11 @@ def test_merge_values_partial_and_empty(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) BEFORE = [ u'Phred Phlyntstone' ] @@ -484,15 +535,15 @@ def test_merge_values_partial_and_partial(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) BEFORE = [u'Phred Phlyntstone'] streamed._current_row[:] = BEFORE MERGED = [42] - TO_MERGE = [self._makeValue(item) for item in MERGED] + TO_MERGE = [self._make_value(item) for item in MERGED] streamed._merge_values(TO_MERGE) self.assertEqual(streamed.rows, []) self.assertEqual(streamed._current_row, BEFORE + MERGED) @@ -501,17 +552,17 @@ def test_merge_values_partial_and_filled(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) BEFORE = [ u'Phred Phlyntstone' ] streamed._current_row[:] = BEFORE MERGED = [42, True] - TO_MERGE = [self._makeValue(item) for item in MERGED] + TO_MERGE = [self._make_value(item) for item in MERGED] streamed._merge_values(TO_MERGE) self.assertEqual(streamed.rows, [BEFORE + MERGED]) self.assertEqual(streamed._current_row, []) @@ -520,13 +571,13 @@ def test_merge_values_partial_and_filled_plus(self): iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) BEFORE = [ - self._makeValue(u'Phred Phlyntstone') + self._make_value(u'Phred Phlyntstone') ] streamed._current_row[:] = BEFORE MERGED = [ @@ -534,7 +585,7 @@ def test_merge_values_partial_and_filled_plus(self): u'Bharney Rhubble', 39, True, u'Wylma Phlyntstone', ] - TO_MERGE = [self._makeValue(item) for item in MERGED] + TO_MERGE = [self._make_value(item) for item in MERGED] VALUES = BEFORE + MERGED streamed._merge_values(TO_MERGE) self.assertEqual(streamed.rows, [VALUES[0:3], VALUES[3:6]]) @@ -547,36 +598,62 @@ def test_consume_next_empty(self): streamed.consume_next() def test_consume_next_first_set_partial(self): + TXN_ID = b'DEADBEEF' FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - metadata = _ResultSetMetadataPB(FIELDS) + metadata = self._make_result_set_metadata( + FIELDS, transaction_id=TXN_ID) BARE = [u'Phred Phlyntstone', 42] - VALUES = [self._makeValue(bare) for bare in BARE] - result_set = _PartialResultSetPB(VALUES, metadata=metadata) + VALUES = [self._make_value(bare) for bare in BARE] + result_set = self._make_partial_result_set(VALUES, metadata=metadata) iterator = _MockCancellableIterator(result_set) - streamed = self._make_one(iterator) + source = mock.Mock(_transaction_id=None, spec=['_transaction_id']) + streamed = self._make_one(iterator, source=source) streamed.consume_next() self.assertEqual(streamed.rows, []) self.assertEqual(streamed._current_row, BARE) - self.assertIs(streamed.metadata, metadata) + self.assertEqual(streamed.metadata, metadata) + self.assertEqual(streamed.resume_token, result_set.resume_token) + self.assertEqual(source._transaction_id, TXN_ID) + + def test_consume_next_first_set_partial_existing_txn_id(self): + TXN_ID = b'DEADBEEF' + FIELDS = [ + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), + ] + metadata = self._make_result_set_metadata( + FIELDS, transaction_id=b'') + BARE = [u'Phred Phlyntstone', 42] + VALUES = [self._make_value(bare) for bare in BARE] + result_set = self._make_partial_result_set(VALUES, metadata=metadata) + iterator = _MockCancellableIterator(result_set) + source = mock.Mock(_transaction_id=TXN_ID, spec=['_transaction_id']) + streamed = self._make_one(iterator, source=source) + streamed.consume_next() + self.assertEqual(streamed.rows, []) + self.assertEqual(streamed._current_row, BARE) + self.assertEqual(streamed.metadata, metadata) self.assertEqual(streamed.resume_token, result_set.resume_token) + self.assertEqual(source._transaction_id, TXN_ID) def test_consume_next_w_partial_result(self): FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] VALUES = [ - self._makeValue(u'Phred '), + self._make_value(u'Phred '), ] - result_set = _PartialResultSetPB(VALUES, chunked_value=True) + result_set = self._make_partial_result_set(VALUES, chunked_value=True) iterator = _MockCancellableIterator(result_set) streamed = self._make_one(iterator) - streamed._metadata = _ResultSetMetadataPB(FIELDS) + streamed._metadata = self._make_result_set_metadata(FIELDS) streamed.consume_next() self.assertEqual(streamed.rows, []) self.assertEqual(streamed._current_row, []) @@ -585,21 +662,21 @@ def test_consume_next_w_partial_result(self): def test_consume_next_w_pending_chunk(self): FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] BARE = [ u'Phlyntstone', 42, True, u'Bharney Rhubble', 39, True, u'Wylma Phlyntstone', ] - VALUES = [self._makeValue(bare) for bare in BARE] - result_set = _PartialResultSetPB(VALUES) + VALUES = [self._make_value(bare) for bare in BARE] + result_set = self._make_partial_result_set(VALUES) iterator = _MockCancellableIterator(result_set) streamed = self._make_one(iterator) - streamed._metadata = _ResultSetMetadataPB(FIELDS) - streamed._pending_chunk = self._makeValue(u'Phred ') + streamed._metadata = self._make_result_set_metadata(FIELDS) + streamed._pending_chunk = self._make_value(u'Phred ') streamed.consume_next() self.assertEqual(streamed.rows, [ [u'Phred Phlyntstone', BARE[1], BARE[2]], @@ -611,26 +688,26 @@ def test_consume_next_w_pending_chunk(self): def test_consume_next_last_set(self): FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - metadata = _ResultSetMetadataPB(FIELDS) - stats = _ResultSetStatsPB( + metadata = self._make_result_set_metadata(FIELDS) + stats = self._make_result_set_stats( rows_returned="1", elapsed_time="1.23 secs", - cpu_tme="0.98 secs", + cpu_time="0.98 secs", ) BARE = [u'Phred Phlyntstone', 42, True] - VALUES = [self._makeValue(bare) for bare in BARE] - result_set = _PartialResultSetPB(VALUES, stats=stats) + VALUES = [self._make_value(bare) for bare in BARE] + result_set = self._make_partial_result_set(VALUES, stats=stats) iterator = _MockCancellableIterator(result_set) streamed = self._make_one(iterator) streamed._metadata = metadata streamed.consume_next() self.assertEqual(streamed.rows, [BARE]) self.assertEqual(streamed._current_row, []) - self.assertIs(streamed._stats, stats) + self.assertEqual(streamed._stats, stats) self.assertEqual(streamed.resume_token, result_set.resume_token) def test_consume_all_empty(self): @@ -640,36 +717,37 @@ def test_consume_all_empty(self): def test_consume_all_one_result_set_partial(self): FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - metadata = _ResultSetMetadataPB(FIELDS) + metadata = self._make_result_set_metadata(FIELDS) BARE = [u'Phred Phlyntstone', 42] - VALUES = [self._makeValue(bare) for bare in BARE] - result_set = _PartialResultSetPB(VALUES, metadata=metadata) + VALUES = [self._make_value(bare) for bare in BARE] + result_set = self._make_partial_result_set(VALUES, metadata=metadata) iterator = _MockCancellableIterator(result_set) streamed = self._make_one(iterator) streamed.consume_all() self.assertEqual(streamed.rows, []) self.assertEqual(streamed._current_row, BARE) - self.assertIs(streamed.metadata, metadata) + self.assertEqual(streamed.metadata, metadata) def test_consume_all_multiple_result_sets_filled(self): FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - metadata = _ResultSetMetadataPB(FIELDS) + metadata = self._make_result_set_metadata(FIELDS) BARE = [ u'Phred Phlyntstone', 42, True, u'Bharney Rhubble', 39, True, u'Wylma Phlyntstone', 41, True, ] - VALUES = [self._makeValue(bare) for bare in BARE] - result_set1 = _PartialResultSetPB(VALUES[:4], metadata=metadata) - result_set2 = _PartialResultSetPB(VALUES[4:]) + VALUES = [self._make_value(bare) for bare in BARE] + result_set1 = self._make_partial_result_set( + VALUES[:4], metadata=metadata) + result_set2 = self._make_partial_result_set(VALUES[4:]) iterator = _MockCancellableIterator(result_set1, result_set2) streamed = self._make_one(iterator) streamed.consume_all() @@ -689,37 +767,38 @@ def test___iter___empty(self): def test___iter___one_result_set_partial(self): FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - metadata = _ResultSetMetadataPB(FIELDS) + metadata = self._make_result_set_metadata(FIELDS) BARE = [u'Phred Phlyntstone', 42] - VALUES = [self._makeValue(bare) for bare in BARE] - result_set = _PartialResultSetPB(VALUES, metadata=metadata) + VALUES = [self._make_value(bare) for bare in BARE] + result_set = self._make_partial_result_set(VALUES, metadata=metadata) iterator = _MockCancellableIterator(result_set) streamed = self._make_one(iterator) found = list(streamed) self.assertEqual(found, []) self.assertEqual(streamed.rows, []) self.assertEqual(streamed._current_row, BARE) - self.assertIs(streamed.metadata, metadata) + self.assertEqual(streamed.metadata, metadata) def test___iter___multiple_result_sets_filled(self): FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - metadata = _ResultSetMetadataPB(FIELDS) + metadata = self._make_result_set_metadata(FIELDS) BARE = [ u'Phred Phlyntstone', 42, True, u'Bharney Rhubble', 39, True, u'Wylma Phlyntstone', 41, True, ] - VALUES = [self._makeValue(bare) for bare in BARE] - result_set1 = _PartialResultSetPB(VALUES[:4], metadata=metadata) - result_set2 = _PartialResultSetPB(VALUES[4:]) + VALUES = [self._make_value(bare) for bare in BARE] + result_set1 = self._make_partial_result_set( + VALUES[:4], metadata=metadata) + result_set2 = self._make_partial_result_set(VALUES[4:]) iterator = _MockCancellableIterator(result_set1, result_set2) streamed = self._make_one(iterator) found = list(streamed) @@ -734,11 +813,11 @@ def test___iter___multiple_result_sets_filled(self): def test___iter___w_existing_rows_read(self): FIELDS = [ - self._makeScalarField('full_name', 'STRING'), - self._makeScalarField('age', 'INT64'), - self._makeScalarField('married', 'BOOL'), + self._make_scalar_field('full_name', 'STRING'), + self._make_scalar_field('age', 'INT64'), + self._make_scalar_field('married', 'BOOL'), ] - metadata = _ResultSetMetadataPB(FIELDS) + metadata = self._make_result_set_metadata(FIELDS) ALREADY = [ [u'Pebbylz Phlyntstone', 4, False], [u'Dino Rhubble', 4, False], @@ -748,9 +827,10 @@ def test___iter___w_existing_rows_read(self): u'Bharney Rhubble', 39, True, u'Wylma Phlyntstone', 41, True, ] - VALUES = [self._makeValue(bare) for bare in BARE] - result_set1 = _PartialResultSetPB(VALUES[:4], metadata=metadata) - result_set2 = _PartialResultSetPB(VALUES[4:]) + VALUES = [self._make_value(bare) for bare in BARE] + result_set1 = self._make_partial_result_set( + VALUES[:4], metadata=metadata) + result_set2 = self._make_partial_result_set(VALUES[4:]) iterator = _MockCancellableIterator(result_set1, result_set2) streamed = self._make_one(iterator) streamed._rows[:] = ALREADY @@ -779,40 +859,6 @@ def __next__(self): # pragma: NO COVER Py3k return self.next() -class _ResultSetMetadataPB(object): - - def __init__(self, fields): - from google.cloud.proto.spanner.v1.type_pb2 import StructType - - self.row_type = StructType(fields=fields) - - -class _ResultSetStatsPB(object): - - def __init__(self, query_plan=None, **query_stats): - from google.protobuf.struct_pb2 import Struct - from google.cloud.spanner._helpers import _make_value_pb - - self.query_plan = query_plan - self.query_stats = Struct(fields={ - key: _make_value_pb(value) for key, value in query_stats.items()}) - - -class _PartialResultSetPB(object): - - resume_token = b'DEADBEEF' - - def __init__(self, values, metadata=None, stats=None, chunked_value=False): - self.values = values - self.metadata = metadata - self.stats = stats - self.chunked_value = chunked_value - - def HasField(self, name): - assert name == 'stats' - return self.stats is not None - - class TestStreamedResultSet_JSON_acceptance_tests(unittest.TestCase): _json_tests = None diff --git a/spanner/tests/unit/test_transaction.py b/spanner/tests/unit/test_transaction.py index 997f4d5153c8..1337cb8b3306 100644 --- a/spanner/tests/unit/test_transaction.py +++ b/spanner/tests/unit/test_transaction.py @@ -49,9 +49,10 @@ def test_ctor_defaults(self): session = _Session() transaction = self._make_one(session) self.assertIs(transaction._session, session) - self.assertIsNone(transaction._id) + self.assertIsNone(transaction._transaction_id) self.assertIsNone(transaction.committed) - self.assertEqual(transaction._rolled_back, False) + self.assertFalse(transaction._rolled_back) + self.assertTrue(transaction._multi_use) def test__check_state_not_begun(self): session = _Session() @@ -62,7 +63,7 @@ def test__check_state_not_begun(self): def test__check_state_already_committed(self): session = _Session() transaction = self._make_one(session) - transaction._id = b'DEADBEEF' + transaction._transaction_id = self.TRANSACTION_ID transaction.committed = object() with self.assertRaises(ValueError): transaction._check_state() @@ -70,7 +71,7 @@ def test__check_state_already_committed(self): def test__check_state_already_rolled_back(self): session = _Session() transaction = self._make_one(session) - transaction._id = b'DEADBEEF' + transaction._transaction_id = self.TRANSACTION_ID transaction._rolled_back = True with self.assertRaises(ValueError): transaction._check_state() @@ -78,20 +79,20 @@ def test__check_state_already_rolled_back(self): def test__check_state_ok(self): session = _Session() transaction = self._make_one(session) - transaction._id = b'DEADBEEF' + transaction._transaction_id = self.TRANSACTION_ID transaction._check_state() # does not raise def test__make_txn_selector(self): session = _Session() transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID selector = transaction._make_txn_selector() self.assertEqual(selector.id, self.TRANSACTION_ID) def test_begin_already_begun(self): session = _Session() transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID with self.assertRaises(ValueError): transaction.begin() @@ -141,7 +142,7 @@ def test_begin_ok(self): txn_id = transaction.begin() self.assertEqual(txn_id, self.TRANSACTION_ID) - self.assertEqual(transaction._id, self.TRANSACTION_ID) + self.assertEqual(transaction._transaction_id, self.TRANSACTION_ID) session_id, txn_options, options = api._begun self.assertEqual(session_id, session.name) @@ -158,7 +159,7 @@ def test_rollback_not_begun(self): def test_rollback_already_committed(self): session = _Session() transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID transaction.committed = object() with self.assertRaises(ValueError): transaction.rollback() @@ -166,7 +167,7 @@ def test_rollback_already_committed(self): def test_rollback_already_rolled_back(self): session = _Session() transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID transaction._rolled_back = True with self.assertRaises(ValueError): transaction.rollback() @@ -179,7 +180,7 @@ def test_rollback_w_gax_error(self): _random_gax_error=True) session = _Session(database) transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID transaction.insert(TABLE_NAME, COLUMNS, VALUES) with self.assertRaises(GaxError): @@ -202,7 +203,7 @@ def test_rollback_ok(self): _rollback_response=empty_pb) session = _Session(database) transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID transaction.replace(TABLE_NAME, COLUMNS, VALUES) transaction.rollback() @@ -224,7 +225,7 @@ def test_commit_not_begun(self): def test_commit_already_committed(self): session = _Session() transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID transaction.committed = object() with self.assertRaises(ValueError): transaction.commit() @@ -232,7 +233,7 @@ def test_commit_already_committed(self): def test_commit_already_rolled_back(self): session = _Session() transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID transaction._rolled_back = True with self.assertRaises(ValueError): transaction.commit() @@ -240,7 +241,7 @@ def test_commit_already_rolled_back(self): def test_commit_no_mutations(self): session = _Session() transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID with self.assertRaises(ValueError): transaction.commit() @@ -252,7 +253,7 @@ def test_commit_w_gax_error(self): _random_gax_error=True) session = _Session(database) transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID transaction.replace(TABLE_NAME, COLUMNS, VALUES) with self.assertRaises(GaxError): @@ -284,7 +285,7 @@ def test_commit_ok(self): _commit_response=response) session = _Session(database) transaction = self._make_one(session) - transaction._id = self.TRANSACTION_ID + transaction._transaction_id = self.TRANSACTION_ID transaction.delete(TABLE_NAME, keyset) transaction.commit()