-
Notifications
You must be signed in to change notification settings - Fork 46
feat: add BigQueryWriteClient
where append_rows
returns a helper for writing rows
#278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…95450856-write-client
Here is the summary of changes. You are about to add 2 region tags.
This comment is generated by snippet-bot.
|
# Make sure RPC has started before returning. | ||
# Without this, consumers may get: | ||
# | ||
# ValueError: Can not send() on an RPC that has never been open()ed. | ||
# | ||
# when they try to send a request. | ||
# TODO: add timeout / failure / sleep | ||
while not self._rpc.is_active: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@plamut This PR has a lot of work to do yet before it's in a mergable state, but I'd like your advice on this. I didn't see how it was handled in Pub/Sub.
In BackgroundConsumer, I see they wait until the thread has started https://github.com/googleapis/python-api-core/blob/40f52bf1100cf56b7f9af267d210b8a72fc34f08/google/api_core/bidi.py#L693 but apparently not long enough for the RPC itself to have started.
Maybe there is another way I should be sending requests besides accessing the _rpc.send(request)
method directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe I'm not seeing it in Pub/Sub because _rpc.send
is only used for a heartbeat? Perhaps folks in Firestore have advice on how to block until we're ready to send requests?
Or maybe send
should just be queuing requests somehow?
CC @crwilcox
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tswast You asking about sending requests over the stream, IIUC?
I don't think Pub/Sub had to solve this in any way, as the only requests sent over the stream are the heartbeats to keep the stream alive. And yes, they invoke _rpc.send()
directly (with a prior "is active" check), but that's about it. Other requests such as ACK and MODACK are sent as unary requests outside of the stream.
However, send()
does queue requests if I'm not mistaken. BidRpc.send()
just queues the request and that's it.
The request queue itself is wrapped in _RequestQueueGenerator
and passed as an argument to a stream-stream RPC call. I imagine that the gRPC machinery iterates over these requests and sends them to the server over the stream.
Perhaps folks in Firestore have advice on how to block until we're ready to send requests?
It appears that we don't have to worry whether the RPC has started yet or not, as requests are put in the queue and the RPC starts taking them out when it's ready. Or did I misunderstand the question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that it's not queueing. I get ValueError: Can not send() on an RPC that has never been open()ed.
if I try to send without waiting first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This busy wait loop is rather scary (could result in infinite hang if things go wrong) so I'd rather avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh... and presumably without the "is active" check?
if self._rpc is not None and self._rpc.is_active:
self._rpc.send(...)
If that doesn't cut it, then I don't know, sorry. I suppose in Pub/Sub we don't even hit this problem, because as you said, only occasional heartbeats are sent over the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, and yeah, it would be nice if we could avoid it. Fortunately Pub/Sub didn't have to deal with that. 🙃, but I guess Firestore folks will know more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, it appears to me that the Firestore use case is rather similar to Pub/Sub in that I don't see anywhere where they actually send requests over the _rpc
. I suppose that makes sense given the class is named "Watch" https://github.com/googleapis/python-firestore/blob/main/google/cloud/firestore_v1/watch.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This busy wait loop is rather scary (could result in infinite hang if things go wrong) so I'd rather avoid it.
What about polling with a (bounded) exponential retry up to some reasonable threshold? If there's no better way...
@lidizheng Do you know if there's anything in gRPC (event, callback...) that would allow blocking until the stream is ready? I.e. without having to poll with call.is_active()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have channel_ready_future
for channel-level readiness. But for calls, we currently don't have an API to block until a message is available. This design is more or less related to the read buffer, we don't want to cache too many messages without sending them to application. Today, we only cache 1 extra message, and after it is sent to application (or gRPC Python layer), the message buffer is removed.
On the other hand, if we want this feature now. This can be done via interceptors (block_until_ready == read_and_cache), or another thread. Or if there is a strong need for this API, we can design some mechanism to block the stream until a read is available.
This PR was accidentally closed when |
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕