Skip to content

Commit c0cce32

Browse files
feat: Add partition_by(), deprecate partition_by_key() and partition_by_max_size() (#299)
1 parent a416e77 commit c0cce32

22 files changed

Lines changed: 1313 additions & 1069 deletions

.github/workflows/R-CMD-check.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ jobs:
5555
- uses: r-lib/actions/setup-r@v2
5656
with:
5757
use-public-rspm: true
58-
extra-repositories: 'https://community.r-multiverse.org'
58+
extra-repositories: 'https://rpolars.r-universe.org'
59+
# extra-repositories: 'https://community.r-multiverse.org'
5960

6061
- name: Install R dependencies
6162
uses: r-lib/actions/setup-r-dependencies@v2

.github/workflows/covr.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ jobs:
4242
- uses: r-lib/actions/setup-r@v2
4343
with:
4444
use-public-rspm: true
45-
extra-repositories: 'https://community.r-multiverse.org'
45+
extra-repositories: 'https://rpolars.r-universe.org'
46+
# extra-repositories: 'https://community.r-multiverse.org'
4647

4748
- uses: r-lib/actions/setup-r-dependencies@v2
4849
with:

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ export(is_polars_df)
9494
export(is_polars_expr)
9595
export(is_polars_lf)
9696
export(make_unique_id)
97+
export(partition_by)
9798
export(partition_by_key)
9899
export(partition_by_max_size)
99100
export(read_csv_polars)

NEWS.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@
3737
* New argument `mkdir` in `write_parquet_polars()` (this already existed in
3838
`sink_parquet()`). (#298)
3939

40+
* New (experimental) function `partition_by()` to write partitioned output in
41+
`sink_*()` and `write_*_polars()`. The following functions are deprecated and
42+
will be removed in a future release (#299):
43+
44+
- `partition_by_key()` can be replaced with `partition_by(key =)`
45+
- `partition_by_max_size()` can be replaced with `partition_by(max_rows_per_file =)`
46+
4047
## Changes
4148

4249
* `collect()` now returns a `tibble` instead of a `data.frame`, for consistency

R/partitioned-output.R

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#' Helper functions to export data as a partitioned output
2+
#'
3+
#' @description
4+
#' `r lifecycle::badge("experimental")`
5+
#'
6+
#' Partitioning schemes are used to write multiple files with `sink_*()` and
7+
#' `write_*_polars()` functions.
8+
#'
9+
#' - `partition_by()`: Configuration for writing to multiple output files.
10+
#' Supports partitioning by key, file size limits, or both.
11+
#'
12+
#' The following functions are deprecated and will be removed in a future release:
13+
#'
14+
#' - `r lifecycle::badge("deprecated")` `partition_by_key()`: use
15+
#' `partition_by(key = ...)` instead.
16+
#' - `r lifecycle::badge("deprecated")` `partition_by_max_size()`: use
17+
#' `partition_by(max_rows_per_file = ...)` instead.
18+
#'
19+
#' @inheritParams rlang::args_dots_empty
20+
#' @param base_path The base path for the output files. Use the `mkdir` option
21+
#' of the `sink_*` or `write_*_polars()` functions to ensure directories in the
22+
#' path are created.
23+
#' @param key Something can be coerced to a list of Polars expressions. Used to
24+
#' partition by.
25+
#' @param include_key A bool indicating whether to include the key columns in
26+
#' the output files. Can only be used if `key` is specified, otherwise should
27+
#' be `NULL`.
28+
#' @param max_rows_per_file An integer-ish value indicating the maximum size in
29+
#' rows of each of the generated files.
30+
#' @param approximate_bytes_per_file An integer-ish value indicating approximate
31+
#' number of bytes to write to each file, or `NULL`. This is measured as the
32+
#' estimated size of the DataFrame in memory. Defaults to approximately 4GB when
33+
#' `key` is specified without `max_rows_per_file`, otherwise unlimited.
34+
#' @param by `r lifecycle::badge("deprecated")` Something can be coerced to a
35+
#' list of Polars expressions. Used to partition by. Use the `key` property of
36+
#' `partition_by()` instead.
37+
#' @param per_partition_sort_by `r lifecycle::badge("deprecated")` Something
38+
#' that can be coerced to a list of Polars expressions, or `NULL` (default).
39+
#' Used to sort over within each partition. Use the `per_partition_sort_by`
40+
#' property of `partition_by()` instead.
41+
#' @param max_size `r lifecycle::badge("deprecated")` An integer-ish value
42+
#' indicating the maximum size in rows of each of the generated files. Use the
43+
#' `max_rows_per_file` property of `partition_by()` instead.
44+
#'
45+
#' @name partitioned_output
46+
#' @export
47+
partition_by <- function(
48+
base_path,
49+
...,
50+
key = NULL,
51+
include_key = NULL,
52+
max_rows_per_file = NULL,
53+
approximate_bytes_per_file = NULL
54+
) {
55+
check_dots_empty()
56+
pl$PartitionBy(
57+
base_path = base_path,
58+
key = key,
59+
include_key = include_key,
60+
max_rows_per_file = max_rows_per_file,
61+
approximate_bytes_per_file = approximate_bytes_per_file
62+
)
63+
}
64+
65+
#' @name partitioned_output
66+
#' @export
67+
partition_by_key <- function(
68+
base_path,
69+
...,
70+
by,
71+
include_key = TRUE,
72+
per_partition_sort_by = NULL
73+
) {
74+
check_dots_empty()
75+
76+
lifecycle::deprecate_warn(
77+
when = "0.16.0",
78+
what = "partition_by_key()",
79+
details = "Please use `partition_by(key = )` instead.",
80+
always = TRUE,
81+
)
82+
83+
suppressWarnings(
84+
pl$PartitionByKey(
85+
base_path = base_path,
86+
by = by,
87+
include_key = include_key,
88+
per_partition_sort_by = per_partition_sort_by
89+
)
90+
)
91+
}
92+
93+
#' @rdname partitioned_output
94+
#' @export
95+
partition_by_max_size <- function(
96+
base_path,
97+
...,
98+
max_size,
99+
per_partition_sort_by = NULL
100+
) {
101+
check_dots_empty()
102+
103+
lifecycle::deprecate_warn(
104+
when = "0.16.0",
105+
what = "partition_by_max_size()",
106+
details = "Please use `partition_by(max_rows_per_file = )` instead.",
107+
always = TRUE,
108+
)
109+
suppressWarnings(
110+
pl$PartitionMaxSize(
111+
base_path = base_path,
112+
max_size = max_size,
113+
per_partition_sort_by = per_partition_sort_by
114+
)
115+
)
116+
}

R/sink.R

Lines changed: 3 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -57,33 +57,9 @@
5757
#'
5858
#' ## Partitioned output
5959
#'
60-
#' It is possible to export a LazyFrame to multiple files, also called
61-
#' *partitioned output*. A partition can be determined in several ways:
62-
#'
63-
#' - by key(s): split by the values of keys. The amount of files that can be
64-
#' written is not limited. However, when writing beyond a certain amount of
65-
#' files, the data for the remaining partitions is buffered before writing to
66-
#' the file.
67-
#' - by maximum number of rows: if the number of rows in a file reaches the
68-
#' maximum number of rows, the file is closed and a new file is opened.
69-
70-
# TODO: add this back when https://github.com/pola-rs/r-polars/issues/1522 is
71-
# solved
72-
# - by "sorted partition": this is a specialized version of partitioning by
73-
# key. Whereas partitioning by key accepts data in any order, this scheme
74-
# expects the input data to be pre-grouped or pre-sorted. This scheme suffers
75-
# a lot less overhead, but may not be always applicable. Each new value of
76-
# the key expressions starts a new partition, therefore repeating the same
77-
# value multiple times may overwrite previous partitions.
78-
# These partitioning schemes can be used with the functions `partition_by_key()`,
79-
# `partition_by_max_size()`, and `partition_parted()`. See Examples below.
80-
81-
#'
82-
#' These partitioning schemes can be used with the functions `partition_by_key()`
83-
#' and `partition_by_max_size()`. See Examples below.
84-
#'
85-
#' Writing a partitioned output usually requires setting `mkdir = TRUE` to
86-
#' automatically create the required subfolders.
60+
#' It is possible to export data to multiple files based on various parameters,
61+
#' such as the values of some variables, or such that each file has a maximum
62+
#' number of rows. See [partition_by()] for more details.
8763
#'
8864
#' @return The input LazyFrame.
8965
#' @export
@@ -124,22 +100,6 @@
124100
#' out_path <- withr::local_tempdir()
125101
#' sink_parquet(my_lf, partition_by_max_size(out_path, max_size = 5), mkdir = TRUE)
126102
#' fs::dir_tree(out_path) # mtcars has 32 rows so we have 7 output files
127-
128-
# TODO: add this back when https://github.com/pola-rs/r-polars/issues/1522 is
129-
# solved
130-
#
131-
# # Split the LazyFrame by pre-sorted data:
132-
# out_path <- withr::local_tempdir()
133-
# my_lf |>
134-
# arrange(am, cyl) |>
135-
# sink_parquet(partition_parted(out_path, by = c("am", "cyl")), mkdir = TRUE)
136-
#
137-
# fs::dir_tree(out_path)
138-
#
139-
# # Careful when using partition_parted(): if the data is not presorted then
140-
# # the output files may be incorrect!
141-
# out_path <- withr::local_tempdir()
142-
# sink_parquet(my_lf, partition_parted(out_path, by = c("am", "cyl")), mkdir = TRUE)
143103
sink_parquet <- function(
144104
.data,
145105
path,
@@ -264,22 +224,6 @@ sink_parquet <- function(
264224
#' out_path <- withr::local_tempdir()
265225
#' sink_csv(my_lf, partition_by_max_size(out_path, max_size = 5), mkdir = TRUE)
266226
#' fs::dir_tree(out_path) # mtcars has 32 rows so we have 7 output files
267-
268-
# TODO: add this back when https://github.com/pola-rs/r-polars/issues/1522 is
269-
# solved
270-
#
271-
# # Split the LazyFrame by pre-sorted data:
272-
# out_path <- withr::local_tempdir()
273-
# my_lf |>
274-
# arrange(am, cyl) |>
275-
# sink_csv(partition_parted(out_path, by = c("am", "cyl")), mkdir = TRUE)
276-
#
277-
# fs::dir_tree(out_path)
278-
#
279-
# # Careful when using partition_parted(): if the data is not presorted then
280-
# # the output files may be incorrect!
281-
# out_path <- withr::local_tempdir()
282-
# sink_csv(my_lf, partition_parted(out_path, by = c("am", "cyl")), mkdir = TRUE)
283227
sink_csv <- function(
284228
.data,
285229
path,
@@ -417,22 +361,6 @@ sink_csv <- function(
417361
#' out_path <- withr::local_tempdir()
418362
#' sink_ipc(my_lf, partition_by_max_size(out_path, max_size = 5), mkdir = TRUE)
419363
#' fs::dir_tree(out_path) # mtcars has 32 rows so we have 7 output files
420-
421-
# TODO: add this back when https://github.com/pola-rs/r-polars/issues/1522 is
422-
# solved
423-
#
424-
# # Split the LazyFrame by pre-sorted data:
425-
# out_path <- withr::local_tempdir()
426-
# my_lf |>
427-
# arrange(am, cyl) |>
428-
# sink_ipc(partition_parted(out_path, by = c("am", "cyl")), mkdir = TRUE)
429-
#
430-
# fs::dir_tree(out_path)
431-
#
432-
# # Careful when using partition_parted(): if the data is not presorted then
433-
# # the output files may be incorrect!
434-
# out_path <- withr::local_tempdir()
435-
# sink_ipc(my_lf, partition_parted(out_path, by = c("am", "cyl")), mkdir = TRUE)
436364
sink_ipc <- function(
437365
.data,
438366
path,
@@ -520,22 +448,6 @@ sink_ipc <- function(
520448
#' out_path <- withr::local_tempdir()
521449
#' sink_ndjson(my_lf, partition_by_max_size(out_path, max_size = 5), mkdir = TRUE)
522450
#' fs::dir_tree(out_path) # mtcars has 32 rows so we have 7 output files
523-
524-
# TODO: add this back when https://github.com/pola-rs/r-polars/issues/1522 is
525-
# solved
526-
#
527-
# # Split the LazyFrame by pre-sorted data:
528-
# out_path <- withr::local_tempdir()
529-
# my_lf |>
530-
# arrange(am, cyl) |>
531-
# sink_ndjson(partition_parted(out_path, by = c("am", "cyl")), mkdir = TRUE)
532-
#
533-
# fs::dir_tree(out_path)
534-
#
535-
# # Careful when using partition_parted(): if the data is not presorted then
536-
# # the output files may be incorrect!
537-
# out_path <- withr::local_tempdir()
538-
# sink_ndjson(my_lf, partition_parted(out_path, by = c("am", "cyl")), mkdir = TRUE)
539451
sink_ndjson <- function(
540452
.data,
541453
path,
@@ -572,74 +484,3 @@ sink_ndjson <- function(
572484
mkdir = mkdir
573485
)
574486
}
575-
576-
#' Helper functions to export a LazyFrame as a partitioned output
577-
#'
578-
#' `r lifecycle::badge("experimental")`
579-
#' More details and examples in the documentation of `sink_*()` functions.
580-
#'
581-
#' @inheritParams rlang::args_dots_empty
582-
#' @param base_path The base path for the output files. Use the `mkdir` option
583-
#' of the `sink_*` methods to ensure directories in the path are created.
584-
#' @param by Something can be coerced to a list of Polars expressions. Used to
585-
#' partition by.
586-
#' @param include_key If `TRUE` (default), include the key columns in the output
587-
#' files.
588-
#' @param per_partition_sort_by Something can be coerced to a list of Polars
589-
#' expressions, or `NULL` (default). Used to sort over within each partition.
590-
#' Note that this might increase the memory consumption needed for each partition.
591-
#' @param max_size An integer-ish value indicating the maximum number of rows in
592-
#' each of the generated files.
593-
#'
594-
#' @name partitioned_output
595-
#' @export
596-
partition_by_key <- function(
597-
base_path,
598-
...,
599-
by,
600-
include_key = TRUE,
601-
per_partition_sort_by = NULL
602-
) {
603-
check_dots_empty()
604-
pl$PartitionByKey(
605-
base_path = base_path,
606-
by = by,
607-
include_key = TRUE,
608-
per_partition_sort_by = NULL
609-
)
610-
}
611-
612-
#' @rdname partitioned_output
613-
#' @export
614-
partition_by_max_size <- function(
615-
base_path,
616-
...,
617-
max_size,
618-
per_partition_sort_by = NULL
619-
) {
620-
check_dots_empty()
621-
pl$PartitionMaxSize(
622-
base_path = base_path,
623-
max_size = max_size,
624-
per_partition_sort_by = NULL
625-
)
626-
}
627-
628-
# TODO: add this back when https://github.com/pola-rs/r-polars/issues/1522 is
629-
# solved
630-
# @export
631-
partition_parted <- function(
632-
base_path,
633-
...,
634-
by,
635-
include_key = TRUE,
636-
per_partition_sort_by = NULL
637-
) {
638-
check_dots_empty()
639-
pl$PartitionParted(
640-
base_path = base_path,
641-
by = by,
642-
include_key = TRUE,
643-
per_partition_sort_by = NULL
644-
)
645-
}

0 commit comments

Comments
 (0)