Skip to content

feat: add BigQueryWriteClient where append_rows returns a helper for writing rows #278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions google/cloud/bigquery_storage_v1beta2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
This is the base from which all interactions with the API occur.
"""

from __future__ import absolute_import
from typing import Sequence, Tuple

import google.api_core.gapic_v1.method
import google.api_core.retry

from google.cloud.bigquery_storage_v1 import reader
from google.cloud.bigquery_storage_v1beta2.services import big_query_read
from google.cloud.bigquery_storage_v1beta2.services import (
big_query_read,
big_query_write,
)
from google.cloud.bigquery_storage_v1beta2 import types
from google.cloud.bigquery_storage_v1beta2 import writer


_SCOPES = (
Expand Down Expand Up @@ -135,3 +141,57 @@ def read_rows(
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
)


class BigQueryWriteClient(big_query_write.BigQueryWriteClient):
"""BigQuery Write API.

The Write API can be used to write data to BigQuery.
"""

def append_rows(
self,
initial_request: types.AppendRowsRequest,
# TODO: add retry and timeout arguments. Blocked by
# https://github.com/googleapis/python-api-core/issues/262
metadata: Sequence[Tuple[str, str]] = (),
) -> Tuple[writer.AppendRowsStream, writer.AppendRowsFuture]:
"""Append data to a given stream.

If ``offset`` is specified, the ``offset`` is checked against
the end of stream. The server returns ``OUT_OF_RANGE`` in
``AppendRowsResponse`` if an attempt is made to append to an
offset beyond the current end of the stream or
``ALREADY_EXISTS`` if user provids an ``offset`` that has
already been written to. User can retry with adjusted offset
within the same RPC stream. If ``offset`` is not specified,
append happens at the end of the stream.

The response contains the offset at which the append happened.
Responses are received in the same order in which requests are
sent. There will be one response for each successful request. If
the ``offset`` is not set in response, it means append didn't
happen due to some errors. If one request fails, all the
subsequent requests will also fail until a success request is
made again.

If the stream is of ``PENDING`` type, data will only be
available for read operations after the stream is committed.

Args:
initial_request:
The initial request message for `AppendRows`. Must contain the
stream name and data descriptor.
metadata (Sequence[Tuple[str, str]]):
Strings which should be sent along with the request as
metadata.

Returns:
A tuple containing a stream and a future. Use the stream to send
additional requests. Close it when finished. Use the future to wait
for the initial request to complete.
"""
gapic_client = super(BigQueryWriteClient, self)
stream = writer.AppendRowsStream(gapic_client, metadata)
initial_response_future = stream.open(initial_request)
return stream, initial_response_future
271 changes: 271 additions & 0 deletions google/cloud/bigquery_storage_v1beta2/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import division

import concurrent.futures
import itertools
import logging
import queue
import threading
from typing import Optional, Sequence, Tuple

import grpc

from google.api_core import bidi
from google.api_core.future import polling as polling_future
from google.api_core import exceptions
import google.api_core.retry
from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
from google.cloud.bigquery_storage_v1beta2.services import big_query_write

_LOGGER = logging.getLogger(__name__)
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"


def _wrap_as_exception(maybe_exception):
"""Wrap an object as a Python exception, if needed.
Args:
maybe_exception (Any): The object to wrap, usually a gRPC exception class.
Returns:
The argument itself if an instance of ``BaseException``, otherwise
the argument represented as an instance of ``Exception`` (sub)class.
"""
if isinstance(maybe_exception, grpc.RpcError):
return exceptions.from_grpc_error(maybe_exception)
elif isinstance(maybe_exception, BaseException):
return maybe_exception

return Exception(maybe_exception)


def _wrap_callback_errors(callback, on_callback_error, message):
"""Wraps a user callback so that if an exception occurs the message is
nacked.
Args:
callback (Callable[None, Message]): The user callback.
message (~Message): The Pub/Sub message.
"""
try:
callback(message)
except Exception as exc:
# Note: the likelihood of this failing is extremely low. This just adds
# a message to a queue, so if this doesn't work the world is in an
# unrecoverable state and this thread should just bail.
_LOGGER.exception(
"Top-level exception occurred in callback while processing a message"
)
message.nack()
on_callback_error(exc)


class AppendRowsStream(object):
"""Does _not_ automatically resume, since it might be a stream where offset
should not be set, like the _default stream.

TODO: Then again, maybe it makes sense to retry the last unacknowledge message
always? If you write to _default / without offset, then you get semantics
more like REST streaming method.
"""

def __init__(
self,
client: big_query_write.BigQueryWriteClient,
metadata: Sequence[Tuple[str, str]] = (),
):
self._client = client
self._closing = threading.Lock()
self._closed = False
self._close_callbacks = []
self._futures_queue = queue.Queue()
self._inital_request = None
self._metadata = metadata
self._rpc = None
self._stream_name = None

# The threads created in ``.open()``.
self._consumer = None

@property
def is_active(self):
"""bool: True if this manager is actively streaming.

Note that ``False`` does not indicate this is complete shut down,
just that it stopped getting new messages.
"""
return self._consumer is not None and self._consumer.is_active

def open(
self,
# TODO: Why does whereever I copied this from have: callback, on_callback_error?
initial_request: gapic_types.AppendRowsRequest,
):
# TODO: Error or no-op if already open?
if self.is_active:
raise ValueError("This manager is already open.")

if self._closed:
raise ValueError("This manager has been closed and can not be re-used.")

# TODO: _inital_request should be a callable to allow for stream
# resumption based on oldest request that hasn't been processed.
self._inital_request = initial_request
self._stream_name = initial_request.write_stream
# TODO: save trace ID too for _initial_request callable.

# TODO: Do I need to avoid this when resuming stream?
inital_response_future = AppendRowsFuture()
self._futures_queue.put(inital_response_future)

self._rpc = bidi.BidiRpc(
self._client.append_rows,
initial_request=self._inital_request,
# TODO: allow additional metadata
# This header is required so that the BigQuery Storage API knows which
# region to route the request to.
metadata=tuple(
itertools.chain(
self._metadata,
(("x-goog-request-params", f"write_stream={self._stream_name}"),),
)
),
)

self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
self._consumer.start()

# Make sure RPC has started before returning.
# Without this, consumers may get:
#
# ValueError: Can not send() on an RPC that has never been open()ed.
#
# when they try to send a request.
# TODO: add timeout / failure / sleep
while not self._rpc.is_active:
pass
Comment on lines +150 to +158
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@plamut This PR has a lot of work to do yet before it's in a mergable state, but I'd like your advice on this. I didn't see how it was handled in Pub/Sub.

In BackgroundConsumer, I see they wait until the thread has started https://github.com/googleapis/python-api-core/blob/40f52bf1100cf56b7f9af267d210b8a72fc34f08/google/api_core/bidi.py#L693 but apparently not long enough for the RPC itself to have started.

Maybe there is another way I should be sending requests besides accessing the _rpc.send(request) method directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe I'm not seeing it in Pub/Sub because _rpc.send is only used for a heartbeat? Perhaps folks in Firestore have advice on how to block until we're ready to send requests?

Or maybe send should just be queuing requests somehow?

CC @crwilcox

Copy link
Contributor

@plamut plamut Aug 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tswast You asking about sending requests over the stream, IIUC?

I don't think Pub/Sub had to solve this in any way, as the only requests sent over the stream are the heartbeats to keep the stream alive. And yes, they invoke _rpc.send() directly (with a prior "is active" check), but that's about it. Other requests such as ACK and MODACK are sent as unary requests outside of the stream.

However, send() does queue requests if I'm not mistaken. BidRpc.send() just queues the request and that's it.

The request queue itself is wrapped in _RequestQueueGenerator and passed as an argument to a stream-stream RPC call. I imagine that the gRPC machinery iterates over these requests and sends them to the server over the stream.

Perhaps folks in Firestore have advice on how to block until we're ready to send requests?

It appears that we don't have to worry whether the RPC has started yet or not, as requests are put in the queue and the RPC starts taking them out when it's ready. Or did I misunderstand the question?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that it's not queueing. I get ValueError: Can not send() on an RPC that has never been open()ed. if I try to send without waiting first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This busy wait loop is rather scary (could result in infinite hang if things go wrong) so I'd rather avoid it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... and presumably without the "is active" check?

if self._rpc is not None and self._rpc.is_active:
    self._rpc.send(...)

If that doesn't cut it, then I don't know, sorry. I suppose in Pub/Sub we don't even hit this problem, because as you said, only occasional heartbeats are sent over the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, and yeah, it would be nice if we could avoid it. Fortunately Pub/Sub didn't have to deal with that. 🙃, but I guess Firestore folks will know more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it appears to me that the Firestore use case is rather similar to Pub/Sub in that I don't see anywhere where they actually send requests over the _rpc. I suppose that makes sense given the class is named "Watch" https://github.com/googleapis/python-firestore/blob/main/google/cloud/firestore_v1/watch.py

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This busy wait loop is rather scary (could result in infinite hang if things go wrong) so I'd rather avoid it.

What about polling with a (bounded) exponential retry up to some reasonable threshold? If there's no better way...

@lidizheng Do you know if there's anything in gRPC (event, callback...) that would allow blocking until the stream is ready? I.e. without having to poll with call.is_active()?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have channel_ready_future for channel-level readiness. But for calls, we currently don't have an API to block until a message is available. This design is more or less related to the read buffer, we don't want to cache too many messages without sending them to application. Today, we only cache 1 extra message, and after it is sent to application (or gRPC Python layer), the message buffer is removed.

On the other hand, if we want this feature now. This can be done via interceptors (block_until_ready == read_and_cache), or another thread. Or if there is a strong need for this API, we can design some mechanism to block the stream until a read is available.


return inital_response_future

def send(self, request: gapic_types.AppendRowsRequest):
"""TODO: document this!
"""
if self._closed:
raise ValueError("This manager has been closed and can not be used.")

# https://github.com/googleapis/python-pubsub/blob/master/google/cloud/pubsub_v1/subscriber/client.py#L228-L244
# TODO: Return a future that waits for response. Maybe should be async?
# https://github.com/googleapis/python-api-core/blob/master/google/api_core/future/polling.py
# create future, add to queue
future = AppendRowsFuture()
self._futures_queue.put(future)
self._rpc.send(request)
return future

def _on_response(self, response: gapic_types.AppendRowsResponse):
"""Process a response from a consumer callback."""
# Since we have 1 response per request, if we get here from a response
# callback, the queue should never be empty.
future: AppendRowsFuture = self._futures_queue.get()
# TODO: look at error and process (set exception or ignore "ALREAD_EXISTS")
future.set_result(response)

def close(self, reason=None):
"""Stop consuming messages and shutdown all helper threads.

This method is idempotent. Additional calls will have no effect.
The method does not block, it delegates the shutdown operations to a background
thread.

Args:
reason (Any): The reason to close this. If ``None``, this is considered
an "intentional" shutdown. This is passed to the callbacks
specified via :meth:`add_close_callback`.
"""
self._regular_shutdown_thread = threading.Thread(
name=_REGULAR_SHUTDOWN_THREAD_NAME,
daemon=True,
target=self._shutdown,
kwargs={"reason": reason},
)
self._regular_shutdown_thread.start()

def _shutdown(self, reason=None):
"""Run the actual shutdown sequence (stop the stream and all helper threads).

Args:
reason (Any): The reason to close the stream. If ``None``, this is
considered an "intentional" shutdown.
"""
with self._closing:
if self._closed:
return

# TODO: Should we wait for responses? What if the queue is not empty?
# Should we mark all those futures as done / failed?
# We are on a background thread, so maybe we should wait?
# Stop consuming messages.
if self.is_active:
_LOGGER.debug("Stopping consumer.")
self._consumer.stop()
self._consumer = None

self._rpc = None
self._closed = True
_LOGGER.debug("Finished stopping manager.")

for callback in self._close_callbacks:
callback(self, reason)


class AppendRowsFuture(concurrent.futures.Future, polling_future.PollingFuture):
"""Encapsulation of the asynchronous execution of an action.

This object is returned from long-running BigQuery Storage API calls, and
is the interface to determine the status of those calls.

This object should not be created directly, but is returned by other
methods in this library.
"""

def done(self, retry: Optional[google.api_core.retry.Retry] = None) -> bool:
"""Check the status of the future.

Args:
retry:
Not used. Included for compatibility with base clase. Future
status is updated by a background thread.

Returns:
``True`` if the request has finished, otherwise ``False``.
"""
# Consumer should call set_result or set_exception method, where this
# gets set to True *after* first setting _result.
#
# Consumer runs in a background thread, but this access is thread-safe:
# https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
return self._result_set

def set_running_or_notify_cancel(self):
"""Not implemented.

This method is needed to make the future API compatible with the
concurrent.futures package, but since this is not constructed by an
executor of the concurrent.futures package, no implementation is
needed. See: https://github.com/googleapis/python-pubsub/pull/397
"""
raise NotImplementedError(
"Only used by executors from `concurrent.futures` package."
)
15 changes: 15 additions & 0 deletions samples/snippets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
#
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading