Skip to content

Commit a63cbae

Browse files
authored
feat: add StreamingDataFrame.to_bigtable and .to_pubsub start_timestamp parameter (#2066)
* feat: add StreamingDataFrame.to_bigtable and .to_pubsub start_timestamp parameter * fix test
1 parent b0ff718 commit a63cbae

File tree

4 files changed

+54
-15
lines changed

4 files changed

+54
-15
lines changed

bigframes/pandas/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from __future__ import annotations
1818

1919
from collections import namedtuple
20-
from datetime import datetime
20+
from datetime import date, datetime
2121
import inspect
2222
import sys
2323
import typing
@@ -198,7 +198,7 @@ def to_datetime(
198198

199199
@typing.overload
200200
def to_datetime(
201-
arg: Union[int, float, str, datetime],
201+
arg: Union[int, float, str, datetime, date],
202202
*,
203203
utc: bool = False,
204204
format: Optional[str] = None,
@@ -209,7 +209,7 @@ def to_datetime(
209209

210210
def to_datetime(
211211
arg: Union[
212-
Union[int, float, str, datetime],
212+
Union[int, float, str, datetime, date],
213213
vendored_pandas_datetimes.local_iterables,
214214
bigframes.series.Series,
215215
bigframes.dataframe.DataFrame,

bigframes/streaming/dataframe.py

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
"""Module for bigquery continuous queries"""
1616
from __future__ import annotations
1717

18+
from abc import abstractmethod
19+
from datetime import date, datetime
1820
import functools
1921
import inspect
2022
import json
21-
from typing import Optional
23+
from typing import Optional, Union
2224
import warnings
2325

2426
from google.cloud import bigquery
27+
import pandas as pd
2528

2629
from bigframes import dataframe
2730
from bigframes.core import log_adapter, nodes
@@ -54,9 +57,14 @@ def _curate_df_doc(doc: Optional[str]):
5457

5558

5659
class StreamingBase:
57-
_appends_sql: str
5860
_session: bigframes.session.Session
5961

62+
@abstractmethod
63+
def _appends_sql(
64+
self, start_timestamp: Optional[Union[int, float, str, datetime, date]]
65+
) -> str:
66+
pass
67+
6068
def to_bigtable(
6169
self,
6270
*,
@@ -70,6 +78,8 @@ def to_bigtable(
7078
bigtable_options: Optional[dict] = None,
7179
job_id: Optional[str] = None,
7280
job_id_prefix: Optional[str] = None,
81+
start_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
82+
end_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
7383
) -> bigquery.QueryJob:
7484
"""
7585
Export the StreamingDataFrame as a continue job and returns a
@@ -115,16 +125,24 @@ def to_bigtable(
115125
If specified, a job id prefix for the query, see
116126
job_id_prefix parameter of
117127
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
118-
128+
start_timestamp (int, float, str, datetime, date, default None):
129+
The starting timestamp for the query. Possible values are to 7 days in the past. If don't specify a timestamp (None), the query will default to the earliest possible time, 7 days ago. If provide a time-zone-naive timestamp, it will be treated as UTC.
119130
Returns:
120131
google.cloud.bigquery.QueryJob:
121132
See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob
122133
The ongoing query job can be managed using this object.
123134
For example, the job can be cancelled or its error status
124135
can be examined.
125136
"""
137+
if not isinstance(
138+
start_timestamp, (int, float, str, datetime, date, type(None))
139+
):
140+
raise ValueError(
141+
f"Unsupported start_timestamp type {type(start_timestamp)}"
142+
)
143+
126144
return _to_bigtable(
127-
self._appends_sql,
145+
self._appends_sql(start_timestamp),
128146
instance=instance,
129147
table=table,
130148
service_account_email=service_account_email,
@@ -145,6 +163,7 @@ def to_pubsub(
145163
service_account_email: str,
146164
job_id: Optional[str] = None,
147165
job_id_prefix: Optional[str] = None,
166+
start_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
148167
) -> bigquery.QueryJob:
149168
"""
150169
Export the StreamingDataFrame as a continue job and returns a
@@ -172,6 +191,8 @@ def to_pubsub(
172191
If specified, a job id prefix for the query, see
173192
job_id_prefix parameter of
174193
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
194+
start_timestamp (int, float, str, datetime, date, default None):
195+
The starting timestamp for the query. Possible values are to 7 days in the past. If don't specify a timestamp (None), the query will default to the earliest possible time, 7 days ago. If provide a time-zone-naive timestamp, it will be treated as UTC.
175196
176197
Returns:
177198
google.cloud.bigquery.QueryJob:
@@ -180,8 +201,15 @@ def to_pubsub(
180201
For example, the job can be cancelled or its error status
181202
can be examined.
182203
"""
204+
if not isinstance(
205+
start_timestamp, (int, float, str, datetime, date, type(None))
206+
):
207+
raise ValueError(
208+
f"Unsupported start_timestamp type {type(start_timestamp)}"
209+
)
210+
183211
return _to_pubsub(
184-
self._appends_sql,
212+
self._appends_sql(start_timestamp),
185213
topic=topic,
186214
service_account_email=service_account_email,
187215
session=self._session,
@@ -280,14 +308,21 @@ def sql(self):
280308
sql.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.sql))
281309

282310
# Patch for the required APPENDS clause
283-
@property
284-
def _appends_sql(self):
311+
def _appends_sql(
312+
self, start_timestamp: Optional[Union[int, float, str, datetime, date]]
313+
) -> str:
285314
sql_str = self.sql
286315
original_table = self._original_table
287316
assert original_table is not None
288317

289318
# TODO(b/405691193): set start time back to NULL. Now set it slightly after 7 days max interval to avoid the bug.
290-
appends_clause = f"APPENDS(TABLE `{original_table}`, CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE))"
319+
start_ts_str = (
320+
str(f"TIMESTAMP('{pd.to_datetime(start_timestamp)}')")
321+
if start_timestamp
322+
else "CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE)"
323+
)
324+
325+
appends_clause = f"APPENDS(TABLE `{original_table}`, {start_ts_str})"
291326
sql_str = sql_str.replace(f"`{original_table}`", appends_clause)
292327
return sql_str
293328

tests/system/large/streaming/test_bigtable.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from datetime import datetime, timedelta
1516
import time
1617
from typing import Generator
1718
import uuid
@@ -91,11 +92,12 @@ def test_streaming_df_to_bigtable(
9192
bigtable_options={},
9293
job_id=None,
9394
job_id_prefix=job_id_prefix,
95+
start_timestamp=datetime.now() - timedelta(days=1),
9496
)
9597

96-
# wait 100 seconds in order to ensure the query doesn't stop
98+
# wait 200 seconds in order to ensure the query doesn't stop
9799
# (i.e. it is continuous)
98-
time.sleep(100)
100+
time.sleep(200)
99101
assert query_job.running()
100102
assert query_job.error_result is None
101103
assert str(query_job.job_id).startswith(job_id_prefix)

tests/system/large/streaming/test_pubsub.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from concurrent import futures
16+
from datetime import datetime, timedelta
1617
from typing import Generator
1718
import uuid
1819

@@ -99,11 +100,12 @@ def callback(message):
99100
service_account_email="[email protected]",
100101
job_id=None,
101102
job_id_prefix=job_id_prefix,
103+
start_timestamp=datetime.now() - timedelta(days=1),
102104
)
103105
try:
104-
# wait 100 seconds in order to ensure the query doesn't stop
106+
# wait 200 seconds in order to ensure the query doesn't stop
105107
# (i.e. it is continuous)
106-
future.result(timeout=100)
108+
future.result(timeout=200)
107109
except futures.TimeoutError:
108110
future.cancel()
109111
assert query_job.running()

0 commit comments

Comments
 (0)