-
Notifications
You must be signed in to change notification settings - Fork 62
fix: show progress even in job optional queries #2119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
1eccb3a
d03e5d1
e6c3ba9
70d8324
5b4b250
2370ea2
4d9f37a
fc1e630
d1a7f70
253de65
5fec058
91506c3
0008e99
1cf0dfd
a6600f8
51e0ca6
b35015e
55cc2f7
e7ca461
0d0ad68
7edbb0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,210 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import dataclasses | ||
| import datetime | ||
| import threading | ||
| from typing import List, Optional | ||
| import weakref | ||
|
|
||
| import google.cloud.bigquery._job_helpers | ||
| import google.cloud.bigquery.job.query | ||
| import google.cloud.bigquery.table | ||
|
|
||
| import bigframes.formatting_helpers | ||
| import bigframes.session.executor | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class Subscriber: | ||
| callback_ref: weakref.ref | ||
| # TODO(tswast): Add block_id to allow filter in context managers. | ||
|
|
||
|
|
||
| class Publisher: | ||
| def __init__(self): | ||
| self._subscribers: List[Subscriber] = [] | ||
| self._subscribers_lock = threading.Lock() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Single lock on a global could cause unexpected cross-session contention. |
||
|
|
||
| def subscribe(self, callback): | ||
|
||
| subscriber = Subscriber(callback_ref=weakref.ref(callback)) | ||
|
|
||
| with self._subscribers_lock: | ||
| # TODO(tswast): Add block_id to allow filter in context managers. | ||
| self._subscribers.append(subscriber) | ||
|
|
||
| def send(self, event: Event): | ||
| to_delete = [] | ||
| to_call = [] | ||
|
|
||
| with self._subscribers_lock: | ||
| for sid, subscriber in enumerate(self._subscribers): | ||
| callback = subscriber.callback_ref() | ||
|
|
||
| if callback is None: | ||
| to_delete.append(sid) | ||
| else: | ||
| # TODO(tswast): Add if statement for block_id to allow filter | ||
| # in context managers. | ||
| to_call.append(callback) | ||
|
|
||
| for sid in reversed(to_delete): | ||
| del self._subscribers[sid] | ||
|
|
||
| for callback in to_call: | ||
| callback(event) | ||
|
|
||
|
|
||
| publisher = Publisher() | ||
|
||
| publisher.subscribe(bigframes.formatting_helpers.progress_callback) | ||
|
|
||
|
|
||
| class Event: | ||
| pass | ||
|
|
||
|
|
||
| class ExecutionStarted(Event): | ||
| pass | ||
|
Comment on lines
+99
to
+100
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have an execution_id or similar so we can correlate all the events tied to a single request?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That could help if we start doing async / background query execution. I don't think it's needed right now, though. |
||
|
|
||
|
|
||
| class ExecutionRunning(Event): | ||
| pass | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class ExecutionFinished(Event): | ||
| result: Optional[bigframes.session.executor.ExecuteResult] = None | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQuerySentEvent(ExecutionRunning): | ||
| """Query sent to BigQuery.""" | ||
|
|
||
| query: str | ||
| billing_project: Optional[str] = None | ||
| location: Optional[str] = None | ||
| job_id: Optional[str] = None | ||
| request_id: Optional[str] = None | ||
|
|
||
| @classmethod | ||
| def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent): | ||
| return cls( | ||
| query=event.query, | ||
| billing_project=event.billing_project, | ||
| location=event.location, | ||
| job_id=event.job_id, | ||
| request_id=event.request_id, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQueryRetryEvent(ExecutionRunning): | ||
| """Query sent another time because the previous attempt failed.""" | ||
|
|
||
| query: str | ||
| billing_project: Optional[str] = None | ||
| location: Optional[str] = None | ||
| job_id: Optional[str] = None | ||
| request_id: Optional[str] = None | ||
|
|
||
| @classmethod | ||
| def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent): | ||
| return cls( | ||
| query=event.query, | ||
| billing_project=event.billing_project, | ||
| location=event.location, | ||
| job_id=event.job_id, | ||
| request_id=event.request_id, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQueryReceivedEvent(ExecutionRunning): | ||
| """Query received and acknowledged by the BigQuery API.""" | ||
|
|
||
| billing_project: Optional[str] = None | ||
| location: Optional[str] = None | ||
| job_id: Optional[str] = None | ||
| statement_type: Optional[str] = None | ||
| state: Optional[str] = None | ||
| query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None | ||
| created: Optional[datetime.datetime] = None | ||
| started: Optional[datetime.datetime] = None | ||
| ended: Optional[datetime.datetime] = None | ||
|
|
||
| @classmethod | ||
| def from_bqclient( | ||
| cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent | ||
| ): | ||
| return cls( | ||
| billing_project=event.billing_project, | ||
| location=event.location, | ||
| job_id=event.job_id, | ||
| statement_type=event.statement_type, | ||
| state=event.state, | ||
| query_plan=event.query_plan, | ||
| created=event.created, | ||
| started=event.started, | ||
| ended=event.ended, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQueryFinishedEvent(ExecutionRunning): | ||
| """Query finished successfully.""" | ||
|
|
||
| billing_project: Optional[str] = None | ||
| location: Optional[str] = None | ||
| query_id: Optional[str] = None | ||
| job_id: Optional[str] = None | ||
| destination: Optional[google.cloud.bigquery.table.TableReference] = None | ||
| total_rows: Optional[int] = None | ||
| total_bytes_processed: Optional[int] = None | ||
| slot_millis: Optional[int] = None | ||
| created: Optional[datetime.datetime] = None | ||
| started: Optional[datetime.datetime] = None | ||
| ended: Optional[datetime.datetime] = None | ||
|
|
||
| @classmethod | ||
| def from_bqclient( | ||
| cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent | ||
| ): | ||
| return cls( | ||
| billing_project=event.billing_project, | ||
| location=event.location, | ||
| query_id=event.query_id, | ||
| job_id=event.job_id, | ||
| destination=event.destination, | ||
| total_rows=event.total_rows, | ||
| total_bytes_processed=event.total_bytes_processed, | ||
| slot_millis=event.slot_millis, | ||
| created=event.created, | ||
| started=event.started, | ||
| ended=event.ended, | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigQueryUnknownEvent(ExecutionRunning): | ||
| """Got unknown event from the BigQuery client library.""" | ||
|
|
||
| # TODO: should we just skip sending unknown events? | ||
|
|
||
| event: object | ||
|
|
||
| @classmethod | ||
| def from_bqclient(cls, event): | ||
| return cls(event) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its more intuitive to just keep subscribers alive? What is the scenario we are imagining? I could imagine this ref could deleted even when subscriber is still alive, because they created an ephemeral function that went out of scope, though the target of said function is still around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The anywidget table widgets. When they re-run the cell, the TableWidget we create is no longer needed and should go out of scope, but as far as I know, we don't really have an opportunity to unsubscribe at that time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really do think the weakref here will have quite unintuitive results. I think there are some other options, basically the main thing is we want to unsubscribe before the callback becomes invalid (because it points at resources that no longer exist, most crucially). The subscriber itself should be able to time this best, and it may not be quite correlated with python object cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, maybe any widget could make some short lived subscribers before/after a call to to_pandas_batches()? I can give it a try.