Skip to content

[Feature] parallel insert and merge statements #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tmirko opened this issue Mar 6, 2023 · 2 comments · Fixed by #80
Closed

[Feature] parallel insert and merge statements #77

tmirko opened this issue Mar 6, 2023 · 2 comments · Fixed by #80
Assignees
Labels
enhancement New feature or request

Comments

@tmirko
Copy link

tmirko commented Mar 6, 2023

Describe the Feature

I would like to have parallel hints added to insert and merge statements based on assigned parameter.

I've achieved this altering oracle__get_incremental_merge_sql macro:

{# added parallel param and parallel hint on line 18 and 34 #} {% macro oracle__get_incremental_merge_sql(args_dict) %} {%- set parallel = config.get('parallel', none) -%} {%- set dest_columns = args_dict["dest_columns"] -%} {%- set temp_relation = args_dict["temp_relation"] -%} {%- set target_relation = args_dict["target_relation"] -%} {%- set unique_key = args_dict["unique_key"] -%} {%- set dest_column_names = dest_columns | map(attribute='name') | list -%} {%- set dest_cols_csv = get_quoted_column_csv(model, dest_column_names) -%} {%- set merge_update_columns = config.get('merge_update_columns') -%} {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} {%- set incremental_predicates = args_dict["incremental_predicates"] -%} {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} {%- if unique_key -%} {%- set unique_key_result = oracle_check_and_quote_unique_key_for_incremental_merge(unique_key, incremental_predicates) -%} {%- set unique_key_list = unique_key_result['unique_key_list'] -%} {%- set unique_key_merge_predicates = unique_key_result['unique_key_merge_predicates'] -%} merge into {% if parallel %} /*+parallel({{ parallel }})*/ {% endif %} {{ target_relation }} DBT_INTERNAL_DEST using {{ temp_relation }} DBT_INTERNAL_SOURCE on ({{ unique_key_merge_predicates | join(' AND ') }}) when matched then update set {% for col in update_columns if (col.upper() not in unique_key_list and col not in unique_key_list) -%} DBT_INTERNAL_DEST.{{ col }} = DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %} {% endfor -%} when not matched then insert({{ dest_cols_csv }}) values( {% for col in dest_columns -%} DBT_INTERNAL_SOURCE.{{ adapter.check_and_quote_identifier(col.name, model.columns) }}{% if not loop.last %}, {% endif %} {% endfor -%} ) {%- else -%} insert into {% if parallel %} /*+parallel({{ parallel }})*/ {% endif %} {{ target_relation }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} from {{ temp_relation }} ) {%- endif -%} {% endmacro %}

Describe alternatives you've considered

No response

Who will this benefit?

No response

Anything else?

No response

@tmirko tmirko added the enhancement New feature or request label Mar 6, 2023
@aosingh aosingh self-assigned this Mar 6, 2023
@aosingh aosingh mentioned this issue Mar 22, 2023
@tmirko
Copy link
Author

tmirko commented Mar 23, 2023

@aosingh Thanks for implementing this!

Unfortunately, I've put the parallel hint on the wrong place in the code above, it should be placed after merge/insert keywords instead of after into. Can you please correct this in the next update? Code should look like as below:

{# added parallel param and parallel hint on line 18 and 34 #} {% macro oracle__get_incremental_merge_sql(args_dict) %} {%- set parallel = config.get('parallel', none) -%} {%- set dest_columns = args_dict["dest_columns"] -%} {%- set temp_relation = args_dict["temp_relation"] -%} {%- set target_relation = args_dict["target_relation"] -%} {%- set unique_key = args_dict["unique_key"] -%} {%- set dest_column_names = dest_columns | map(attribute='name') | list -%} {%- set dest_cols_csv = get_quoted_column_csv(model, dest_column_names) -%} {%- set merge_update_columns = config.get('merge_update_columns') -%} {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} {%- set incremental_predicates = args_dict["incremental_predicates"] -%} {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} {%- if unique_key -%} {%- set unique_key_result = oracle_check_and_quote_unique_key_for_incremental_merge(unique_key, incremental_predicates) -%} {%- set unique_key_list = unique_key_result['unique_key_list'] -%} {%- set unique_key_merge_predicates = unique_key_result['unique_key_merge_predicates'] -%} merge {% if parallel %} /*+parallel({{ parallel }})*/ {% endif %} into {{ target_relation }} DBT_INTERNAL_DEST using {{ temp_relation }} DBT_INTERNAL_SOURCE on ({{ unique_key_merge_predicates | join(' AND ') }}) when matched then update set {% for col in update_columns if (col.upper() not in unique_key_list and col not in unique_key_list) -%} DBT_INTERNAL_DEST.{{ col }} = DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %} {% endfor -%} when not matched then insert({{ dest_cols_csv }}) values( {% for col in dest_columns -%} DBT_INTERNAL_SOURCE.{{ adapter.check_and_quote_identifier(col.name, model.columns) }}{% if not loop.last %}, {% endif %} {% endfor -%} ) {%- else -%} insert {% if parallel %} /*+parallel({{ parallel }})*/ {% endif %} into {{ target_relation }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} from {{ temp_relation }} ) {%- endif -%} {% endmacro %}

@aosingh
Copy link
Member

aosingh commented Mar 23, 2023

Thanks @Mirko-T for pointing this out. I have updated it and raised a PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants