|
| 1 | +--- |
| 2 | +apiVersion: batch/v1 |
| 3 | +kind: Job |
| 4 | +metadata: |
| 5 | + name: create-tables-in-trino |
| 6 | +spec: |
| 7 | + template: |
| 8 | + spec: |
| 9 | + containers: |
| 10 | + - name: create-tables-in-trino |
| 11 | + image: docker.stackable.tech/stackable/testing-tools:0.2.0-stackable23.11.0 |
| 12 | + command: ["bash", "-c", "python -u /tmp/script/script.py"] |
| 13 | + volumeMounts: |
| 14 | + - name: script |
| 15 | + mountPath: /tmp/script |
| 16 | + - name: trino-static-users |
| 17 | + mountPath: /trino-static-users |
| 18 | + volumes: |
| 19 | + - name: script |
| 20 | + configMap: |
| 21 | + name: create-tables-in-trino-script |
| 22 | + - name: trino-static-users |
| 23 | + secret: |
| 24 | + secretName: trino-static-users |
| 25 | + restartPolicy: OnFailure |
| 26 | + backoffLimit: 50 |
| 27 | +--- |
| 28 | +apiVersion: v1 |
| 29 | +kind: ConfigMap |
| 30 | +metadata: |
| 31 | + name: create-tables-in-trino-script |
| 32 | +data: |
| 33 | + script.py: | |
| 34 | + import sys |
| 35 | + import trino |
| 36 | +
|
| 37 | + if not sys.warnoptions: |
| 38 | + import warnings |
| 39 | + warnings.simplefilter("ignore") |
| 40 | +
|
| 41 | + def get_connection(): |
| 42 | + connection = trino.dbapi.connect( |
| 43 | + host="trino-coordinator", |
| 44 | + port=8443, |
| 45 | + user="data-import", |
| 46 | + http_scheme='https', |
| 47 | + auth=trino.auth.BasicAuthentication("data-import", open("/trino-static-users/data-import").read()), |
| 48 | + ) |
| 49 | + connection._http_session.verify = False |
| 50 | + return connection |
| 51 | +
|
| 52 | + def run_query(connection, query): |
| 53 | + print(f"[DEBUG] Executing query {query}") |
| 54 | + cursor = connection.cursor() |
| 55 | + cursor.execute(query) |
| 56 | + return cursor.fetchall() |
| 57 | +
|
| 58 | + def run_query_and_assert_more_than_one_row(connection, query): |
| 59 | + rows = run_query(connection, query)[0][0] |
| 60 | + assert rows > 0 |
| 61 | +
|
| 62 | + connection = get_connection() |
| 63 | +
|
| 64 | + run_query(connection, "CREATE SCHEMA IF NOT EXISTS lakehouse.compliance_analytics WITH (location = 'hdfs:/lakehouse/compliance-analytics/')") |
| 65 | + run_query(connection, "CREATE SCHEMA IF NOT EXISTS lakehouse.customer_analytics WITH (location = 'hdfs:/lakehouse/customer-analytics/')") |
| 66 | + run_query(connection, "CREATE SCHEMA IF NOT EXISTS lakehouse.marketing WITH (location = 'hdfs:/lakehouse/marketing/')") |
| 67 | + run_query(connection, "CREATE SCHEMA IF NOT EXISTS lakehouse.employees WITH (location = 'hdfs:/lakehouse/employees/')") |
| 68 | +
|
| 69 | + run_query(connection, """ |
| 70 | + CREATE TABLE IF NOT EXISTS lakehouse.customer_analytics.customer AS |
| 71 | + SELECT |
| 72 | + -- char(N) not supported by Iceberg |
| 73 | + c_customer_sk, |
| 74 | + cast(c_customer_id as varchar) as c_customer_id, |
| 75 | + c_current_cdemo_sk, |
| 76 | + c_current_hdemo_sk, |
| 77 | + c_current_addr_sk, |
| 78 | + c_first_shipto_date_sk, |
| 79 | + c_first_sales_date_sk, |
| 80 | + cast(c_salutation as varchar) as c_salutation, |
| 81 | + cast(c_first_name as varchar) as c_first_name, |
| 82 | + cast(c_last_name as varchar) as c_last_name, |
| 83 | + cast(c_preferred_cust_flag as varchar) as c_preferred_cust_flag, |
| 84 | + c_birth_day, |
| 85 | + c_birth_month, |
| 86 | + c_birth_year, |
| 87 | + cast(c_birth_country as varchar) as c_birth_country, |
| 88 | + cast(c_login as varchar) as c_login, |
| 89 | + cast(c_email_address as varchar) as c_email_address, |
| 90 | + c_last_review_date_sk |
| 91 | + FROM tpcds.sf1.customer |
| 92 | + """) |
| 93 | +
|
| 94 | + run_query(connection, """ |
| 95 | + CREATE TABLE IF NOT EXISTS lakehouse.customer_analytics.customer_address AS |
| 96 | + SELECT |
| 97 | + -- char(N) not supported by Iceberg |
| 98 | + ca_address_sk, |
| 99 | + cast(ca_address_id as varchar) as ca_address_id, |
| 100 | + cast(ca_street_number as varchar) as ca_street_number, |
| 101 | + cast(ca_street_name as varchar) as ca_street_name, |
| 102 | + cast(ca_street_type as varchar) as ca_street_type, |
| 103 | + cast(ca_suite_number as varchar) as ca_suite_number, |
| 104 | + cast(ca_city as varchar) as ca_city, |
| 105 | + cast(ca_county as varchar) as ca_county, |
| 106 | + cast(ca_state as varchar) as ca_state, |
| 107 | + cast(ca_zip as varchar) as ca_zip, |
| 108 | + cast(ca_country as varchar) as ca_country, |
| 109 | + ca_gmt_offset, |
| 110 | + cast(ca_location_type as varchar) as ca_location_type |
| 111 | + FROM tpcds.sf1.customer_address |
| 112 | + """) |
| 113 | +
|
| 114 | + run_query(connection, """ |
| 115 | + CREATE TABLE IF NOT EXISTS lakehouse.customer_analytics.customer_demographics AS |
| 116 | + SELECT |
| 117 | + -- char(N) not supported by Iceberg |
| 118 | + cd_demo_sk, |
| 119 | + cast(cd_gender as varchar) as cd_gender, |
| 120 | + cast(cd_marital_status as varchar) as cd_marital_status, |
| 121 | + cast(cd_education_status as varchar) as cd_education_status, |
| 122 | + cd_purchase_estimate, |
| 123 | + cast(cd_credit_rating as varchar) as cd_credit_rating, |
| 124 | + cd_dep_count, |
| 125 | + cd_dep_employed_count, |
| 126 | + cd_dep_college_count |
| 127 | + FROM tpcds.sf1.customer_demographics |
| 128 | + """) |
| 129 | +
|
| 130 | + run_query(connection, """ |
| 131 | + CREATE TABLE IF NOT EXISTS lakehouse.customer_analytics.income_band AS |
| 132 | + SELECT |
| 133 | + ib_income_band_sk, |
| 134 | + ib_lower_bound, |
| 135 | + ib_upper_bound |
| 136 | + FROM tpcds.sf1.income_band |
| 137 | + """) |
| 138 | +
|
| 139 | + run_query(connection, """ |
| 140 | + CREATE TABLE IF NOT EXISTS lakehouse.customer_analytics.household_demographics AS |
| 141 | + SELECT |
| 142 | + -- char(N) not supported by Iceberg |
| 143 | + hd_demo_sk, |
| 144 | + hd_income_band_sk, |
| 145 | + cast(hd_buy_potential as varchar) as hd_buy_potential, |
| 146 | + hd_dep_count, |
| 147 | + hd_vehicle_count |
| 148 | + FROM tpcds.sf1.household_demographics |
| 149 | + """) |
| 150 | +
|
| 151 | + run_query(connection, """ |
| 152 | + create or replace view lakehouse.customer_analytics.table_information security invoker as |
| 153 | + with |
| 154 | + table_infos as ( |
| 155 | + select 'customer' as "table", (select count(*) from lakehouse.customer_analytics.customer) as records, (select count(*) from lakehouse.customer_analytics."customer$snapshots") as snapshots |
| 156 | + union all select 'customer_address' as "table", (select count(*) from lakehouse.customer_analytics.customer_address) as records, (select count(*) from lakehouse.customer_analytics."customer_address$snapshots") as snapshots |
| 157 | + union all select 'customer_demographics' as "table", (select count(*) from lakehouse.customer_analytics.customer_demographics) as records, (select count(*) from lakehouse.customer_analytics."customer_demographics$snapshots") as snapshots |
| 158 | + union all select 'income_band' as "table", (select count(*) from lakehouse.customer_analytics.income_band) as records, (select count(*) from lakehouse.customer_analytics."income_band$snapshots") as snapshots |
| 159 | + union all select 'household_demographics' as "table", (select count(*) from lakehouse.customer_analytics.household_demographics) as records, (select count(*) from lakehouse.customer_analytics."household_demographics$snapshots") as snapshots |
| 160 | + ), |
| 161 | + table_file_infos as ( |
| 162 | + select |
| 163 | + "table", |
| 164 | + sum(file_size_in_bytes) as size_in_bytes, |
| 165 | + count(*) as num_files, |
| 166 | + sum(file_size_in_bytes) / count(*) as avg_file_size, |
| 167 | + min(file_size_in_bytes) as min_file_size, |
| 168 | + max(file_size_in_bytes) as max_file_size |
| 169 | + from ( |
| 170 | + select 'customer' as "table", * from lakehouse.customer_analytics."customer$files" |
| 171 | + union all select 'customer_address' as "table", * from lakehouse.customer_analytics."customer_address$files" |
| 172 | + union all select 'customer_demographics' as "table", * from lakehouse.customer_analytics."customer_demographics$files" |
| 173 | + union all select 'income_band' as "table", * from lakehouse.customer_analytics."income_band$files" |
| 174 | + union all select 'household_demographics' as "table", * from lakehouse.customer_analytics."household_demographics$files" |
| 175 | + ) |
| 176 | + group by 1 |
| 177 | + ) |
| 178 | + select |
| 179 | + i."table", |
| 180 | + i.records, |
| 181 | + format_number(f.size_in_bytes) as size_in_bytes, |
| 182 | + f.num_files, |
| 183 | + format_number(f.avg_file_size) as avg_file_size, |
| 184 | + format_number(f.min_file_size) as min_file_size, |
| 185 | + format_number(f.max_file_size) as max_file_size, |
| 186 | + i.snapshots, |
| 187 | + f.size_in_bytes / i.records as avg_record_size |
| 188 | + from table_infos as i |
| 189 | + left join table_file_infos as f |
| 190 | + on i."table" = f."table" |
| 191 | + """) |
| 192 | +
|
| 193 | + run_query(connection, """ |
| 194 | + CREATE OR REPLACE VIEW lakehouse.customer_analytics.customer_enriched security invoker AS |
| 195 | + SELECT |
| 196 | + c_customer_id as customer_id, |
| 197 | + c_current_cdemo_sk as customer_demo_sk, |
| 198 | + c_current_hdemo_sk as household_demo_sk, |
| 199 | + c_salutation as salutation, |
| 200 | + c_first_name AS given_name, |
| 201 | + c_last_name AS family_name, |
| 202 | + COALESCE(c_preferred_cust_flag = 'Y', false) AS preferred_customer, |
| 203 | + CAST(date_parse(CAST(c_birth_year AS varchar) || '-' || CAST(c_birth_month AS varchar) || '-' || CAST(c_birth_day AS varchar), '%Y-%m-%d') AS date) AS birth_date, |
| 204 | + c_email_address as email_address, |
| 205 | + ca_country as country, |
| 206 | + ca_state as state, |
| 207 | + ca_zip as zip, |
| 208 | + ca_city as city, |
| 209 | + ca_county as county, |
| 210 | + ca_street_name as ca_street_name, |
| 211 | + ca_street_number as ca_street_number, |
| 212 | + ca_suite_number as suite_number, |
| 213 | + ca_location_type as location_type, |
| 214 | + ca_gmt_offset as gmt_offset |
| 215 | + FROM lakehouse.customer_analytics.customer as c |
| 216 | + LEFT JOIN lakehouse.customer_analytics.customer_address as a ON a.ca_address_sk = c.c_current_addr_sk |
| 217 | + """) |
| 218 | +
|
| 219 | + run_query(connection, """ |
| 220 | + CREATE OR REPLACE VIEW lakehouse.customer_analytics.customer_demographics_enriched security invoker AS |
| 221 | + SELECT |
| 222 | + cd_demo_sk as demo_sk, |
| 223 | + cd_gender as gender, |
| 224 | + cd_marital_status as marital_status, |
| 225 | + cd_education_status as education_status |
| 226 | + FROM lakehouse.customer_analytics.customer_demographics as d |
| 227 | + """) |
| 228 | +
|
| 229 | + run_query(connection, """ |
| 230 | + CREATE OR REPLACE VIEW lakehouse.customer_analytics.household_demographics_enriched security invoker AS |
| 231 | + SELECT |
| 232 | + hd_demo_sk as demo_sk, |
| 233 | + ib_lower_bound as income_lower_bound, |
| 234 | + ib_upper_bound as income_upper_bound, |
| 235 | + hd_buy_potential as buy_potential, |
| 236 | + hd_dep_count as dependant_count, |
| 237 | + hd_vehicle_count as vehicle_count |
| 238 | + FROM lakehouse.customer_analytics.household_demographics as d |
| 239 | + LEFT JOIN lakehouse.customer_analytics.income_band as i ON i.ib_income_band_sk = d.hd_income_band_sk |
| 240 | + """) |
| 241 | +
|
| 242 | + run_query(connection, """ |
| 243 | + CREATE OR REPLACE VIEW lakehouse.compliance_analytics.customer_enriched security invoker AS |
| 244 | + SELECT |
| 245 | + c_customer_id as customer_id, |
| 246 | + c_salutation as salutation, |
| 247 | + COALESCE(c_preferred_cust_flag = 'Y', false) AS preferred_customer, |
| 248 | + c_birth_year as birth_year, |
| 249 | + c_email_address as email_address, |
| 250 | + ca_country as country, |
| 251 | + ca_state as state, |
| 252 | + ca_zip as zip, |
| 253 | + ca_city as city, |
| 254 | + ca_gmt_offset as gmt_offset, |
| 255 | + cd_gender as gender, |
| 256 | + cd_marital_status as marital_status |
| 257 | + FROM lakehouse.customer_analytics.customer as c |
| 258 | + LEFT JOIN lakehouse.customer_analytics.customer_address as a ON a.ca_address_sk = c.c_current_addr_sk |
| 259 | + LEFT JOIN lakehouse.customer_analytics.customer_demographics as cd ON cd.cd_demo_sk = c.c_current_cdemo_sk |
| 260 | + """) |
| 261 | +
|
| 262 | + run_query(connection, """ |
| 263 | + CREATE TABLE IF NOT EXISTS lakehouse.employees.employees AS |
| 264 | + SELECT 'william.lewis' as username, 'William' as given_name, 'Lewis' as family_name, '[email protected]' as email, NULL as supervisor, 65000 as salary |
| 265 | + UNION ALL SELECT 'sophia.clarke', 'Sophia', 'Clarke', '[email protected]', 'william.lewis', 60000 |
| 266 | + UNION ALL SELECT 'daniel.king', 'Daniel', 'King', '[email protected]', 'william.lewis', 60000 |
| 267 | + UNION ALL SELECT 'pamela.scott', 'Pamela', 'Scott', '[email protected]', NULL, 70000 |
| 268 | + UNION ALL SELECT 'justin.martin', 'Justin', 'Martin', '[email protected]', 'pamela.scott', 65000 |
| 269 | + UNION ALL SELECT 'sophia.clarke', 'Sophia', 'Clarke', '[email protected]', 'pamela.scott', 65000 |
| 270 | + UNION ALL SELECT 'mark.ketting', 'Mark', 'Ketting', '[email protected]', NULL, 60000 |
| 271 | + """) |
0 commit comments