From f4f284443f1c7fc5ca47f292497d8ad19c358089 Mon Sep 17 00:00:00 2001 From: "mingliang.gml" Date: Sun, 9 Feb 2020 22:16:10 +0800 Subject: [PATCH 1/5] Add concat_column in elasticdl feature column --- .../feature_column/feature_column.py | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/elasticdl/python/elasticdl/feature_column/feature_column.py b/elasticdl/python/elasticdl/feature_column/feature_column.py index aa3e1fa28..04ad7babb 100644 --- a/elasticdl/python/elasticdl/feature_column/feature_column.py +++ b/elasticdl/python/elasticdl/feature_column/feature_column.py @@ -1,6 +1,8 @@ import collections +import itertools import math +import tensorflow as tf from tensorflow.python.feature_column import feature_column as fc_old from tensorflow.python.feature_column import feature_column_v2 as fc_lib from tensorflow.python.framework import tensor_shape @@ -185,3 +187,135 @@ def reset(self): @property def embedding_and_ids(self): return self._embedding_delegate.embedding_and_ids + + +def concat_column(categorical_columns): + if not isinstance(categorical_columns, list): + raise ValueError("categorical_columns should be a list") + + if not categorical_columns: + raise ValueError("categorical_columns shouldn't be empty") + + for column in categorical_columns: + if not isinstance(column, fc_lib.CategoricalColumn): + raise ValueError( + "Items of categorical_columns should be CategoricalColumn." + " Given:{}".format(column) + ) + + return ConcatColumn(categorical_columns=tuple(categorical_columns)) + + +class ConcatColumn( + fc_lib.CategoricalColumn, + fc_old._CategoricalColumn, + collections.namedtuple("ConcatColumn", ("categorical_columns")), +): + def __init__(self, **kwargs): + # Calculate the offset tensor + total_num_buckets = 0 + leaf_column_num_buckets = [] + for categorical_column in self.categorical_columns: + leaf_column_num_buckets.append(categorical_column.num_buckets) + total_num_buckets += categorical_column.num_buckets + self.accumulated_offsets = list( + itertools.accumulate([0] + leaf_column_num_buckets[:-1]) + ) + self.total_num_buckets = total_num_buckets + + @property + def _is_v2_column(self): + for categorical_column in self.categorical_columns: + if not categorical_column._is_v2_column: + return False + + return True + + @property + def name(self): + feature_names = [] + for categorical_column in self.categorical_columns: + feature_names.append(categorical_column.name) + + return "_C_".join(sorted(feature_names)) + + @property + def num_buckets(self): + return self.total_num_buckets + + @property + def _num_buckets(self): + return self.total_num_buckets + + def transform_feature(self, transformation_cache, state_manager): + feature_tensors = [] + for categorical_column in self.categorical_columns: + ids_and_weights = categorical_column.get_sparse_tensors( + transformation_cache, state_manager + ) + feature_tensors.append(ids_and_weights.id_tensor) + + feature_tensors_with_offset = [] + for index, offset in enumerate(self.accumulated_offsets): + feature_tensor = feature_tensors[index] + feature_tensor_with_offset = tf.SparseTensor( + indices=feature_tensor.indices, + values=tf.cast( + tf.add(feature_tensor.values, offset), tf.int32 + ), + dense_shape=feature_tensor.dense_shape, + ) + feature_tensors_with_offset.append(feature_tensor_with_offset) + + return tf.sparse.concat(axis=-1, sp_inputs=feature_tensors_with_offset) + + def get_sparse_tensors(self, transformation_cache, state_manager): + return fc_lib.CategoricalColumn.IdWeightPair( + transformation_cache.get(self, state_manager), None + ) + + @property + def parents(self): + return list(self.categorical_columns) + + @property + def parse_example_spec(self): + config = {} + for categorical_column in self.categorical_columns: + config.update(categorical_column.parse_example_spec) + + return config + + @property + def _parse_example_spec(self): + return self.parse_example_spec + + def get_config(self): + from tensorflow.python.feature_column.serialization import ( + serialize_feature_column, + ) # pylint: disable=g-import-not-at-top + + config = dict(zip(self._fields, self)) + config["categorical_columns"] = tuple( + [serialize_feature_column(fc) for fc in self.categorical_columns] + ) + + return config + + @classmethod + def from_config(cls, config, custom_objects=None, columns_by_name=None): + """See 'FeatureColumn` base class.""" + from tensorflow.python.feature_column.serialization import ( + deserialize_feature_column, + ) # pylint: disable=g-import-not-at-top + + fc_lib._check_config_keys(config, cls._fields) + kwargs = fc_lib._standardize_and_copy_config(config) + kwargs["categorical_columns"] = tuple( + [ + deserialize_feature_column(c, custom_objects, columns_by_name) + for c in config["categorical_columns"] + ] + ) + + return cls(**kwargs) From b7f151038a22ab5c08a63955f887c3374019302b Mon Sep 17 00:00:00 2001 From: "mingliang.gml" Date: Mon, 10 Feb 2020 20:28:23 +0800 Subject: [PATCH 2/5] Update the output type of concat_column from tf.int32 to tf.int64 --- elasticdl/python/elasticdl/feature_column/feature_column.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticdl/python/elasticdl/feature_column/feature_column.py b/elasticdl/python/elasticdl/feature_column/feature_column.py index 04ad7babb..7a2c9156a 100644 --- a/elasticdl/python/elasticdl/feature_column/feature_column.py +++ b/elasticdl/python/elasticdl/feature_column/feature_column.py @@ -261,7 +261,7 @@ def transform_feature(self, transformation_cache, state_manager): feature_tensor_with_offset = tf.SparseTensor( indices=feature_tensor.indices, values=tf.cast( - tf.add(feature_tensor.values, offset), tf.int32 + tf.add(feature_tensor.values, offset), tf.int64 ), dense_shape=feature_tensor.dense_shape, ) From 03c734f48953cd860e1ec999c0db8209824fa4a9 Mon Sep 17 00:00:00 2001 From: "mingliang.gml" Date: Mon, 10 Feb 2020 20:33:59 +0800 Subject: [PATCH 3/5] Add the placeholder for concat_column test case --- elasticdl/python/tests/feature_column_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/elasticdl/python/tests/feature_column_test.py b/elasticdl/python/tests/feature_column_test.py index ba1c18627..2a24ec622 100644 --- a/elasticdl/python/tests/feature_column_test.py +++ b/elasticdl/python/tests/feature_column_test.py @@ -253,6 +253,8 @@ def _mock_gather_embedding(name, ids): np.isclose(grad_values.numpy(), expected_grads).all() ) + def test_concat_column(self): + pass if __name__ == "__main__": unittest.main() From d286ceb43d4719448efa0aa03a9bd3fcf8f5e8a0 Mon Sep 17 00:00:00 2001 From: "mingliang.gml" Date: Mon, 10 Feb 2020 21:00:50 +0800 Subject: [PATCH 4/5] Add test case for concat_column --- elasticdl/python/tests/feature_column_test.py | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/elasticdl/python/tests/feature_column_test.py b/elasticdl/python/tests/feature_column_test.py index 2a24ec622..073cec439 100644 --- a/elasticdl/python/tests/feature_column_test.py +++ b/elasticdl/python/tests/feature_column_test.py @@ -254,7 +254,34 @@ def _mock_gather_embedding(name, ids): ) def test_concat_column(self): - pass + user_id = tf.feature_column.categorical_column_with_identity( + "user_id", num_buckets=32 + ) + + item_id = tf.feature_column.categorical_column_with_identity( + "item_id", num_buckets=128 + ) + + item_id_user_id_concat = feature_column.concat_column( + [user_id, item_id] + ) + + concat_indicator = tf.feature_column.indicator_column( + item_id_user_id_concat + ) + + output = call_feature_columns( + [concat_indicator], {"user_id": [10, 20], "item_id": [1, 120]}, + ) + + expected_output = tf.one_hot(indices=[10, 20], depth=160) + tf.one_hot( + indices=[1 + 32, 120 + 32], depth=160 + ) + + self.assertTrue( + np.array_equal(output.numpy(), expected_output.numpy()) + ) + if __name__ == "__main__": unittest.main() From 9c28e5c0b7807ca0c431ac18d582644b58f47d45 Mon Sep 17 00:00:00 2001 From: "mingliang.gml" Date: Wed, 12 Feb 2020 07:15:28 +0800 Subject: [PATCH 5/5] Add the syntax example for wide and deep model --- docs/designs/data_transform.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/designs/data_transform.md b/docs/designs/data_transform.md index 9b51ae03b..cf4b701dd 100644 --- a/docs/designs/data_transform.md +++ b/docs/designs/data_transform.md @@ -191,6 +191,24 @@ LABEL label It trains a DNN model to classify someone's income level using the [census income dataset](https://archive.ics.uci.edu/ml/datasets/Census+Income). The transform expression is `COLUMN NUMERIC(NORMALIZE(capital_gain)), NUMERIC(STANDARDIZE(age)), EMBEDDING(BUCKETIZE(hours_per_week, bucket_num=5), dim=32)`. It will normalize the column *capital_gain*, standardize the column *age*, bucketize the column *hours_per_week* to 5 buckets and then map it to an embedding vector. +Next, Let's see a more complicated scenario. The following SQL statment trains a [wide and deep model](https://ai.googleblog.com/2016/06/wide-deep-learning-better-together-with.html) using the same dataset. + +```SQL +SELECT * +FROM census_income +TO TRAIN WideAndDeepClassifier +COLUMN + EMBEDDING(CONCAT(VOCABULARIZE(workclass), BUCKETIZE(capital_gain, bucket_num=5), BUCKETIZE(capital_loss, bucket_num=5), BUCKTIZE(hours_per_week, bucket_num=6)) AS group_1, 8), + EMBEDDING(CONCAT(HASH(education), HASH(occupation), VOCABULARIZE(martial_status), VOCABULARIZE(relationship)) AS group_2, 8), + EMBEDDING(CONCAT(BUCKETIZE(age, bucket_num=5), HASH(native_country), VOCABULARIZE(race), VOCABULARIZE(sex)) AS group_3, 8) + FOR deep_embeddings +COLUMN + EMBEDDING(group1, 1), + EMBEDDING(group2, 1) + FOR wide_embeddings +LABEL label +``` + *Please check the [discussion](https://github.com/sql-machine-learning/elasticdl/issues/1664).* SQLFlow will convert the `COLUMN` expression to Python code of data transformation. But it requires some parameters which are derived from the data. So next we will do the analysis work.