From aa3622dbf950774cb14dc30f1d004175441d056f Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Wed, 10 Sep 2025 00:15:00 +0000 Subject: [PATCH 1/2] feat: add StreamingDataFrame.to_bigtable and .to_pubsub start_timestamp parameter --- bigframes/pandas/__init__.py | 6 +-- bigframes/streaming/dataframe.py | 50 ++++++++++++++++--- tests/system/large/streaming/test_bigtable.py | 4 +- 3 files changed, 49 insertions(+), 11 deletions(-) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 6ffed5b53f..81f441f16f 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -17,7 +17,7 @@ from __future__ import annotations from collections import namedtuple -from datetime import datetime +from datetime import date, datetime import inspect import sys import typing @@ -194,7 +194,7 @@ def to_datetime( @typing.overload def to_datetime( - arg: Union[int, float, str, datetime], + arg: Union[int, float, str, datetime, date], *, utc: bool = False, format: Optional[str] = None, @@ -205,7 +205,7 @@ def to_datetime( def to_datetime( arg: Union[ - Union[int, float, str, datetime], + Union[int, float, str, datetime, date], vendored_pandas_datetimes.local_iterables, bigframes.series.Series, bigframes.dataframe.DataFrame, diff --git a/bigframes/streaming/dataframe.py b/bigframes/streaming/dataframe.py index 69247879d1..6cef3153b1 100644 --- a/bigframes/streaming/dataframe.py +++ b/bigframes/streaming/dataframe.py @@ -15,13 +15,16 @@ """Module for bigquery continuous queries""" from __future__ import annotations +from abc import abstractmethod +from datetime import date, datetime import functools import inspect import json -from typing import Optional +from typing import Optional, Union import warnings from google.cloud import bigquery +import pandas as pd from bigframes import dataframe from bigframes.core import log_adapter, nodes @@ -54,9 +57,14 @@ def _curate_df_doc(doc: Optional[str]): class StreamingBase: - _appends_sql: str _session: bigframes.session.Session + @abstractmethod + def _appends_sql( + self, start_timestamp: Optional[Union[int, float, str, datetime, date]] + ) -> str: + pass + def to_bigtable( self, *, @@ -70,6 +78,8 @@ def to_bigtable( bigtable_options: Optional[dict] = None, job_id: Optional[str] = None, job_id_prefix: Optional[str] = None, + start_timestamp: Optional[Union[int, float, str, datetime, date]] = None, + end_timestamp: Optional[Union[int, float, str, datetime, date]] = None, ) -> bigquery.QueryJob: """ Export the StreamingDataFrame as a continue job and returns a @@ -115,6 +125,8 @@ def to_bigtable( If specified, a job id prefix for the query, see job_id_prefix parameter of https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + start_timestamp (int, float, str, datetime, date, default None): + The start timestamp of the query. Possible values should be within the recent 7 days. If None, will start from the max value, 7 days ago. If pass in time zone naive values, use UTC time zone. Returns: google.cloud.bigquery.QueryJob: @@ -123,8 +135,15 @@ def to_bigtable( For example, the job can be cancelled or its error status can be examined. """ + if not isinstance( + start_timestamp, (int, float, str, datetime, date, type(None)) + ): + raise ValueError( + f"Unsupported start_timestamp type {type(start_timestamp)}" + ) + return _to_bigtable( - self._appends_sql, + self._appends_sql(start_timestamp), instance=instance, table=table, service_account_email=service_account_email, @@ -145,6 +164,7 @@ def to_pubsub( service_account_email: str, job_id: Optional[str] = None, job_id_prefix: Optional[str] = None, + start_timestamp: Optional[Union[int, float, str, datetime, date]] = None, ) -> bigquery.QueryJob: """ Export the StreamingDataFrame as a continue job and returns a @@ -172,6 +192,8 @@ def to_pubsub( If specified, a job id prefix for the query, see job_id_prefix parameter of https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + start_timestamp (int, float, str, datetime, date, default None): + The start timestamp of the query. Possible values should be within the recent 7 days. If None, will start from the max value, 7 days ago. If pass in time zone naive values, use UTC time zone. Returns: google.cloud.bigquery.QueryJob: @@ -180,8 +202,15 @@ def to_pubsub( For example, the job can be cancelled or its error status can be examined. """ + if not isinstance( + start_timestamp, (int, float, str, datetime, date, type(None)) + ): + raise ValueError( + f"Unsupported start_timestamp type {type(start_timestamp)}" + ) + return _to_pubsub( - self._appends_sql, + self._appends_sql(start_timestamp), topic=topic, service_account_email=service_account_email, session=self._session, @@ -280,14 +309,21 @@ def sql(self): sql.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.sql)) # Patch for the required APPENDS clause - @property - def _appends_sql(self): + def _appends_sql( + self, start_timestamp: Optional[Union[int, float, str, datetime, date]] + ) -> str: sql_str = self.sql original_table = self._original_table assert original_table is not None # TODO(b/405691193): set start time back to NULL. Now set it slightly after 7 days max interval to avoid the bug. - appends_clause = f"APPENDS(TABLE `{original_table}`, CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE))" + start_ts_str = ( + str(pd.to_datetime(start_timestamp)) + if start_timestamp + else "CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE)" + ) + + appends_clause = f"APPENDS(TABLE `{original_table}`, {start_ts_str})" sql_str = sql_str.replace(f"`{original_table}`", appends_clause) return sql_str diff --git a/tests/system/large/streaming/test_bigtable.py b/tests/system/large/streaming/test_bigtable.py index e57b7e6e0e..6a37f67be7 100644 --- a/tests/system/large/streaming/test_bigtable.py +++ b/tests/system/large/streaming/test_bigtable.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime, timedelta import time from typing import Generator import uuid @@ -67,7 +68,7 @@ def bigtable_table( bt_table.delete() -@pytest.mark.flaky(retries=3, delay=10) +# @pytest.mark.flaky(retries=3, delay=10) def test_streaming_df_to_bigtable( session_load: bigframes.Session, bigtable_table: table.Table ): @@ -91,6 +92,7 @@ def test_streaming_df_to_bigtable( bigtable_options={}, job_id=None, job_id_prefix=job_id_prefix, + start_timestamp=datetime.now() - timedelta(days=1), ) # wait 100 seconds in order to ensure the query doesn't stop From 411d0815a4a3a2c2fe1d3cf5d5d125bac5d49d0e Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Wed, 10 Sep 2025 18:25:02 +0000 Subject: [PATCH 2/2] fix test --- bigframes/streaming/dataframe.py | 7 +++---- tests/system/large/streaming/test_bigtable.py | 6 +++--- tests/system/large/streaming/test_pubsub.py | 6 ++++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/bigframes/streaming/dataframe.py b/bigframes/streaming/dataframe.py index 6cef3153b1..7dc9e964bc 100644 --- a/bigframes/streaming/dataframe.py +++ b/bigframes/streaming/dataframe.py @@ -126,8 +126,7 @@ def to_bigtable( job_id_prefix parameter of https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query start_timestamp (int, float, str, datetime, date, default None): - The start timestamp of the query. Possible values should be within the recent 7 days. If None, will start from the max value, 7 days ago. If pass in time zone naive values, use UTC time zone. - + The starting timestamp for the query. Possible values are to 7 days in the past. If don't specify a timestamp (None), the query will default to the earliest possible time, 7 days ago. If provide a time-zone-naive timestamp, it will be treated as UTC. Returns: google.cloud.bigquery.QueryJob: See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob @@ -193,7 +192,7 @@ def to_pubsub( job_id_prefix parameter of https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query start_timestamp (int, float, str, datetime, date, default None): - The start timestamp of the query. Possible values should be within the recent 7 days. If None, will start from the max value, 7 days ago. If pass in time zone naive values, use UTC time zone. + The starting timestamp for the query. Possible values are to 7 days in the past. If don't specify a timestamp (None), the query will default to the earliest possible time, 7 days ago. If provide a time-zone-naive timestamp, it will be treated as UTC. Returns: google.cloud.bigquery.QueryJob: @@ -318,7 +317,7 @@ def _appends_sql( # TODO(b/405691193): set start time back to NULL. Now set it slightly after 7 days max interval to avoid the bug. start_ts_str = ( - str(pd.to_datetime(start_timestamp)) + str(f"TIMESTAMP('{pd.to_datetime(start_timestamp)}')") if start_timestamp else "CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE)" ) diff --git a/tests/system/large/streaming/test_bigtable.py b/tests/system/large/streaming/test_bigtable.py index 6a37f67be7..38e01f44bc 100644 --- a/tests/system/large/streaming/test_bigtable.py +++ b/tests/system/large/streaming/test_bigtable.py @@ -68,7 +68,7 @@ def bigtable_table( bt_table.delete() -# @pytest.mark.flaky(retries=3, delay=10) +@pytest.mark.flaky(retries=3, delay=10) def test_streaming_df_to_bigtable( session_load: bigframes.Session, bigtable_table: table.Table ): @@ -95,9 +95,9 @@ def test_streaming_df_to_bigtable( start_timestamp=datetime.now() - timedelta(days=1), ) - # wait 100 seconds in order to ensure the query doesn't stop + # wait 200 seconds in order to ensure the query doesn't stop # (i.e. it is continuous) - time.sleep(100) + time.sleep(200) assert query_job.running() assert query_job.error_result is None assert str(query_job.job_id).startswith(job_id_prefix) diff --git a/tests/system/large/streaming/test_pubsub.py b/tests/system/large/streaming/test_pubsub.py index 277b44c93b..9ff965fd77 100644 --- a/tests/system/large/streaming/test_pubsub.py +++ b/tests/system/large/streaming/test_pubsub.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent import futures +from datetime import datetime, timedelta from typing import Generator import uuid @@ -99,11 +100,12 @@ def callback(message): service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", job_id=None, job_id_prefix=job_id_prefix, + start_timestamp=datetime.now() - timedelta(days=1), ) try: - # wait 100 seconds in order to ensure the query doesn't stop + # wait 200 seconds in order to ensure the query doesn't stop # (i.e. it is continuous) - future.result(timeout=100) + future.result(timeout=200) except futures.TimeoutError: future.cancel() assert query_job.running()