2727import functools
2828import itertools
2929import random
30- import textwrap
3130import typing
3231from typing import (
3332 Iterable ,
5453from bigframes .core import agg_expressions , local_data
5554import bigframes .core as core
5655import bigframes .core .agg_expressions as ex_types
57- import bigframes .core .compile .googlesql as googlesql
5856import bigframes .core .expression as ex
5957import bigframes .core .expression as scalars
6058import bigframes .core .guid as guid
6159import bigframes .core .identifiers
6260import bigframes .core .join_def as join_defs
6361import bigframes .core .ordering as ordering
6462import bigframes .core .pyarrow_utils as pyarrow_utils
65- import bigframes .core .schema as bf_schema
66- import bigframes .core .sql as sql
6763import bigframes .core .utils as utils
6864import bigframes .core .window_spec as windows
6965import bigframes .dtypes
@@ -2776,14 +2772,6 @@ def _throw_if_null_index(self, opname: str):
27762772 )
27772773
27782774 def _get_rows_as_json_values (self ) -> Block :
2779- # We want to preserve any ordering currently present before turning to
2780- # direct SQL manipulation. We will restore the ordering when we rebuild
2781- # expression.
2782- # TODO(shobs): Replace direct SQL manipulation by structured expression
2783- # manipulation
2784- expr , ordering_column_name = self .expr .promote_offsets ()
2785- expr_sql = self .session ._executor .to_sql (expr )
2786-
27872775 # Names of the columns to serialize for the row.
27882776 # We will use the repr-eval pattern to serialize a value here and
27892777 # deserialize in the cloud function. Let's make sure that would work.
@@ -2799,93 +2787,44 @@ def _get_rows_as_json_values(self) -> Block:
27992787 )
28002788
28012789 column_names .append (serialized_column_name )
2802- column_names_csv = sql .csv (map (sql .simple_literal , column_names ))
2803-
2804- # index columns count
2805- index_columns_count = len (self .index_columns )
28062790
28072791 # column references to form the array of values for the row
28082792 column_types = list (self .index .dtypes ) + list (self .dtypes )
28092793 column_references = []
28102794 for type_ , col in zip (column_types , self .expr .column_ids ):
2811- if isinstance (type_ , pd .ArrowDtype ) and pa .types .is_binary (
2812- type_ .pyarrow_dtype
2813- ):
2814- column_references .append (sql .to_json_string (col ))
2795+ if type_ == bigframes .dtypes .BYTES_DTYPE :
2796+ column_references .append (ops .ToJSONString ().as_expr (col ))
2797+ elif type_ == bigframes .dtypes .BOOL_DTYPE :
2798+ # cast operator produces True/False, but function template expects lower case
2799+ column_references .append (
2800+ ops .lower_op .as_expr (
2801+ ops .AsTypeOp (bigframes .dtypes .STRING_DTYPE ).as_expr (col )
2802+ )
2803+ )
28152804 else :
2816- column_references .append (sql .cast_as_string (col ))
2817-
2818- column_references_csv = sql .csv (column_references )
2819-
2820- # types of the columns to serialize for the row
2821- column_types_csv = sql .csv (
2822- [sql .simple_literal (str (typ )) for typ in column_types ]
2823- )
2805+ column_references .append (
2806+ ops .AsTypeOp (bigframes .dtypes .STRING_DTYPE ).as_expr (col )
2807+ )
28242808
28252809 # row dtype to use for deserializing the row as pandas series
28262810 pandas_row_dtype = bigframes .dtypes .lcd_type (* column_types )
28272811 if pandas_row_dtype is None :
28282812 pandas_row_dtype = "object"
2829- pandas_row_dtype = sql .simple_literal (str (pandas_row_dtype ))
2830-
2831- # create a json column representing row through SQL manipulation
2832- row_json_column_name = guid .generate_guid ()
2833- select_columns = (
2834- [ordering_column_name ] + list (self .index_columns ) + [row_json_column_name ]
2835- )
2836- select_columns_csv = sql .csv (
2837- [googlesql .identifier (col ) for col in select_columns ]
2838- )
2839- json_sql = f"""\
2840- With T0 AS (
2841- { textwrap .indent (expr_sql , " " )}
2842- ),
2843- T1 AS (
2844- SELECT *,
2845- TO_JSON_STRING(JSON_OBJECT(
2846- "names", [{ column_names_csv } ],
2847- "types", [{ column_types_csv } ],
2848- "values", [{ column_references_csv } ],
2849- "indexlength", { index_columns_count } ,
2850- "dtype", { pandas_row_dtype }
2851- )) AS { googlesql .identifier (row_json_column_name )} FROM T0
2852- )
2853- SELECT { select_columns_csv } FROM T1
2854- """
2855- # The only ways this code is used is through df.apply(axis=1) cope path
2856- destination , query_job = self .session ._loader ._query_to_destination (
2857- json_sql , cluster_candidates = [ordering_column_name ]
2858- )
2859- if not destination :
2860- raise ValueError (f"Query job { query_job } did not produce result table" )
2861-
2862- new_schema = (
2863- self .expr .schema .select ([* self .index_columns ])
2864- .append (
2865- bf_schema .SchemaItem (
2866- row_json_column_name , bigframes .dtypes .STRING_DTYPE
2867- )
2868- )
2869- .append (
2870- bf_schema .SchemaItem (ordering_column_name , bigframes .dtypes .INT_DTYPE )
2871- )
2872- )
2813+ pandas_row_dtype = str (pandas_row_dtype )
28732814
2874- dest_table = self .session .bqclient .get_table (destination )
2875- expr = core .ArrayValue .from_table (
2876- dest_table ,
2877- schema = new_schema ,
2878- session = self .session ,
2879- offsets_col = ordering_column_name ,
2880- n_rows = dest_table .num_rows ,
2881- ).drop_columns ([ordering_column_name ])
2882- block = Block (
2883- expr ,
2884- index_columns = self .index_columns ,
2885- column_labels = [row_json_column_name ],
2886- index_labels = self ._index_labels ,
2815+ struct_op = ops .StructOp (
2816+ column_names = ("names" , "types" , "values" , "indexlength" , "dtype" )
28872817 )
2888- return block
2818+ names_val = ex .const (tuple (column_names ))
2819+ types_val = ex .const (tuple (map (str , column_types )))
2820+ values_val = ops .ToArrayOp ().as_expr (* column_references )
2821+ indexlength_val = ex .const (len (self .index_columns ))
2822+ dtype_val = ex .const (str (pandas_row_dtype ))
2823+ struct_expr = struct_op .as_expr (
2824+ names_val , types_val , values_val , indexlength_val , dtype_val
2825+ )
2826+ block , col_id = self .project_expr (ops .ToJSONString ().as_expr (struct_expr ))
2827+ return block .select_column (col_id )
28892828
28902829
28912830class BlockIndexProperties :
0 commit comments