Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

from collections import namedtuple
from datetime import datetime
from datetime import date, datetime
import inspect
import sys
import typing
Expand Down Expand Up @@ -194,7 +194,7 @@ def to_datetime(

@typing.overload
def to_datetime(
arg: Union[int, float, str, datetime],
arg: Union[int, float, str, datetime, date],
*,
utc: bool = False,
format: Optional[str] = None,
Expand All @@ -205,7 +205,7 @@ def to_datetime(

def to_datetime(
arg: Union[
Union[int, float, str, datetime],
Union[int, float, str, datetime, date],
vendored_pandas_datetimes.local_iterables,
bigframes.series.Series,
bigframes.dataframe.DataFrame,
Expand Down
51 changes: 43 additions & 8 deletions bigframes/streaming/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
"""Module for bigquery continuous queries"""
from __future__ import annotations

from abc import abstractmethod
from datetime import date, datetime
import functools
import inspect
import json
from typing import Optional
from typing import Optional, Union
import warnings

from google.cloud import bigquery
import pandas as pd

from bigframes import dataframe
from bigframes.core import log_adapter, nodes
Expand Down Expand Up @@ -54,9 +57,14 @@ def _curate_df_doc(doc: Optional[str]):


class StreamingBase:
_appends_sql: str
_session: bigframes.session.Session

@abstractmethod
def _appends_sql(
self, start_timestamp: Optional[Union[int, float, str, datetime, date]]
) -> str:
pass

def to_bigtable(
self,
*,
Expand All @@ -70,6 +78,8 @@ def to_bigtable(
bigtable_options: Optional[dict] = None,
job_id: Optional[str] = None,
job_id_prefix: Optional[str] = None,
start_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
end_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
) -> bigquery.QueryJob:
"""
Export the StreamingDataFrame as a continue job and returns a
Expand Down Expand Up @@ -115,16 +125,24 @@ def to_bigtable(
If specified, a job id prefix for the query, see
job_id_prefix parameter of
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query

start_timestamp (int, float, str, datetime, date, default None):
The starting timestamp for the query. Possible values are to 7 days in the past. If don't specify a timestamp (None), the query will default to the earliest possible time, 7 days ago. If provide a time-zone-naive timestamp, it will be treated as UTC.
Returns:
google.cloud.bigquery.QueryJob:
See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob
The ongoing query job can be managed using this object.
For example, the job can be cancelled or its error status
can be examined.
"""
if not isinstance(
start_timestamp, (int, float, str, datetime, date, type(None))
):
raise ValueError(
f"Unsupported start_timestamp type {type(start_timestamp)}"
)

return _to_bigtable(
self._appends_sql,
self._appends_sql(start_timestamp),
instance=instance,
table=table,
service_account_email=service_account_email,
Expand All @@ -145,6 +163,7 @@ def to_pubsub(
service_account_email: str,
job_id: Optional[str] = None,
job_id_prefix: Optional[str] = None,
start_timestamp: Optional[Union[int, float, str, datetime, date]] = None,
) -> bigquery.QueryJob:
"""
Export the StreamingDataFrame as a continue job and returns a
Expand Down Expand Up @@ -172,6 +191,8 @@ def to_pubsub(
If specified, a job id prefix for the query, see
job_id_prefix parameter of
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
start_timestamp (int, float, str, datetime, date, default None):
The starting timestamp for the query. Possible values are to 7 days in the past. If don't specify a timestamp (None), the query will default to the earliest possible time, 7 days ago. If provide a time-zone-naive timestamp, it will be treated as UTC.

Returns:
google.cloud.bigquery.QueryJob:
Expand All @@ -180,8 +201,15 @@ def to_pubsub(
For example, the job can be cancelled or its error status
can be examined.
"""
if not isinstance(
start_timestamp, (int, float, str, datetime, date, type(None))
):
raise ValueError(
f"Unsupported start_timestamp type {type(start_timestamp)}"
)

return _to_pubsub(
self._appends_sql,
self._appends_sql(start_timestamp),
topic=topic,
service_account_email=service_account_email,
session=self._session,
Expand Down Expand Up @@ -280,14 +308,21 @@ def sql(self):
sql.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.sql))

# Patch for the required APPENDS clause
@property
def _appends_sql(self):
def _appends_sql(
self, start_timestamp: Optional[Union[int, float, str, datetime, date]]
) -> str:
sql_str = self.sql
original_table = self._original_table
assert original_table is not None

# TODO(b/405691193): set start time back to NULL. Now set it slightly after 7 days max interval to avoid the bug.
appends_clause = f"APPENDS(TABLE `{original_table}`, CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE))"
start_ts_str = (
str(f"TIMESTAMP('{pd.to_datetime(start_timestamp)}')")
if start_timestamp
else "CURRENT_TIMESTAMP() - (INTERVAL 7 DAY - INTERVAL 5 MINUTE)"
)

appends_clause = f"APPENDS(TABLE `{original_table}`, {start_ts_str})"
sql_str = sql_str.replace(f"`{original_table}`", appends_clause)
return sql_str

Expand Down
6 changes: 4 additions & 2 deletions tests/system/large/streaming/test_bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime, timedelta
import time
from typing import Generator
import uuid
Expand Down Expand Up @@ -91,11 +92,12 @@ def test_streaming_df_to_bigtable(
bigtable_options={},
job_id=None,
job_id_prefix=job_id_prefix,
start_timestamp=datetime.now() - timedelta(days=1),
)

# wait 100 seconds in order to ensure the query doesn't stop
# wait 200 seconds in order to ensure the query doesn't stop
# (i.e. it is continuous)
time.sleep(100)
time.sleep(200)
assert query_job.running()
assert query_job.error_result is None
assert str(query_job.job_id).startswith(job_id_prefix)
Expand Down
6 changes: 4 additions & 2 deletions tests/system/large/streaming/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from concurrent import futures
from datetime import datetime, timedelta
from typing import Generator
import uuid

Expand Down Expand Up @@ -99,11 +100,12 @@ def callback(message):
service_account_email="[email protected]",
job_id=None,
job_id_prefix=job_id_prefix,
start_timestamp=datetime.now() - timedelta(days=1),
)
try:
# wait 100 seconds in order to ensure the query doesn't stop
# wait 200 seconds in order to ensure the query doesn't stop
# (i.e. it is continuous)
future.result(timeout=100)
future.result(timeout=200)
except futures.TimeoutError:
future.cancel()
assert query_job.running()
Expand Down