Introduction to prt

The prt object introduced by this package is intended to represent tabular data stored as one or more fst files. This is in similar spirit as disk.frame, but is much less ambitious in scope and therefore much simpler in implementation. While the disk.frame package attempts to provide a dplyr compliant API and offers parallel computation via the future package, the intended use-case for prt objects is the situation where only a (small) subset of rows of the (large) tabular dataset are of interest for analysis at once. This subset can be specified using the base generic function subset() and the selected data is read into memory as a data.table object. Subsequent data operations and analysis is then preformed on this data.table representation. For this reason, partition-level parallelism is not in-scope for prt as fst already provides an efficient shared memory parallel implementation for decompression. Furthermore the much more complex multi-function non-standard evaluation API provided by dplyr was forgone in favor of the very simple one-function approach presented by the base R S3 generic function subset().

For the purpose of illustration of some prt features and particularities, we instantiate a dataset as data.table object and create a temporary directory which will contain the file-based data back ends.

tmp <- tempfile()
dir.create((tmp))

dat <- data.table::setDT(nycflights13::flights)
print(dat)
#>         year month day dep_time sched_dep_time dep_delay arr_time
#>      1: 2013     1   1      517            515         2      830
#>      2: 2013     1   1      533            529         4      850
#>      3: 2013     1   1      542            540         2      923
#>      4: 2013     1   1      544            545        -1     1004
#>      5: 2013     1   1      554            600        -6      812
#>     ---                                                          
#> 336772: 2013     9  30       NA           1455        NA       NA
#> 336773: 2013     9  30       NA           2200        NA       NA
#> 336774: 2013     9  30       NA           1210        NA       NA
#> 336775: 2013     9  30       NA           1159        NA       NA
#> 336776: 2013     9  30       NA            840        NA       NA
#>         sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#>      1:            819        11      UA   1545  N14228    EWR  IAH      227
#>      2:            830        20      UA   1714  N24211    LGA  IAH      227
#>      3:            850        33      AA   1141  N619AA    JFK  MIA      160
#>      4:           1022       -18      B6    725  N804JB    JFK  BQN      183
#>      5:            837       -25      DL    461  N668DN    LGA  ATL      116
#>     ---                                                                     
#> 336772:           1634        NA      9E   3393    <NA>    JFK  DCA       NA
#> 336773:           2312        NA      9E   3525    <NA>    LGA  SYR       NA
#> 336774:           1330        NA      MQ   3461  N535MQ    LGA  BNA       NA
#> 336775:           1344        NA      MQ   3572  N511MQ    LGA  CLE       NA
#> 336776:           1020        NA      MQ   3531  N839MQ    LGA  RDU       NA
#>         distance hour minute           time_hour
#>      1:     1400    5     15 2013-01-01 05:00:00
#>      2:     1416    5     29 2013-01-01 05:00:00
#>      3:     1089    5     40 2013-01-01 05:00:00
#>      4:     1576    5     45 2013-01-01 05:00:00
#>      5:      762    6      0 2013-01-01 06:00:00
#>     ---                                         
#> 336772:      213   14     55 2013-09-30 14:00:00
#> 336773:      198   22      0 2013-09-30 22:00:00
#> 336774:      764   12     10 2013-09-30 12:00:00
#> 336775:      419   11     59 2013-09-30 11:00:00
#> 336776:      431    8     40 2013-09-30 08:00:00

Creating a prt object consisting of 2 partitions can for example be done as

flights <- as_prt(dat, n_chunks = 2L, dir = tempfile(tmpdir = tmp))
print(flights)
#> # A prt:        336,776 × 19
#> # Partitioning: [168,388, 168,388] rows
#>          year month   day dep_time sched_dep_time dep_delay arr_time
#>         <int> <int> <int>    <int>          <int>     <dbl>    <int>
#>       1  2013     1     1      517            515         2      830
#>       2  2013     1     1      533            529         4      850
#>       3  2013     1     1      542            540         2      923
#>       4  2013     1     1      544            545        -1     1004
#>       5  2013     1     1      554            600        -6      812
#>       …
#> 336,772  2013     9    30       NA           1455        NA       NA
#> 336,773  2013     9    30       NA           2200        NA       NA
#> 336,774  2013     9    30       NA           1210        NA       NA
#> 336,775  2013     9    30       NA           1159        NA       NA
#> 336,776  2013     9    30       NA            840        NA       NA
#> # … with 336,766 more rows, and 12 more variables: sched_arr_time <int>,
#> #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>, origin <chr>,
#> #   dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>,
#> #   time_hour <dttm>

This simply splits rows of dat into 2 equally sized groups, preserving the original row ordering and writes each group to its own fst file. Depending on the types of queries that are most frequently run against the data, this naive partitioning might not be optimal. While fst does provide random row access, row selection is only possible via index ranges. Consequently, for each partition all rows that fall into the range between the minimum and the maximum required index will be read into memory and superfluous rows are discarded. If for example the data were to be most frequently accessed by airline, the resulting data loads would be more efficient if the data was already sorted by carrier codes.

dat <- data.table::setorderv(dat, "carrier")
grp <- cumsum(table(dat$carrier)) / nrow(dat) < 0.5
dat <- split(dat, grp[dat$carrier])

by_carrier <- as_prt(dat, dir = tempfile(tmpdir = tmp))
by_carrier
#> # A prt:        336,776 × 19
#> # Partitioning: [182,128, 154,648] rows
#>          year month   day dep_time sched_dep_time dep_delay arr_time
#>         <int> <int> <int>    <int>          <int>     <dbl>    <int>
#>       1  2013     1     1      557            600        -3      709
#>       2  2013     1     1      624            630        -6      909
#>       3  2013     1     1      632            608        24      740
#>       4  2013     1     1      809            815        -6     1043
#>       5  2013     1     1      811            815        -4     1006
#>       …
#> 336,772  2013     9    30     1955           2000        -5     2219
#> 336,773  2013     9    30     1956           1825        91     2208
#> 336,774  2013     9    30     2041           2045        -4     2147
#> 336,775  2013     9    30     2050           2045         5       20
#> 336,776  2013     9    30     2121           2100        21     2349
#> # … with 336,766 more rows, and 12 more variables: sched_arr_time <int>,
#> #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>, origin <chr>,
#> #   dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>,
#> #   time_hour <dttm>

The behavior of subsetting operations on prt objects is modeled after that of tibble objects. Columns can be extracted using [[, $ (with partial matching being disallowed), or by selecting a single column with [ and passing TRUE as drop argument.

str(flights[[1L]])
#>  int [1:336776] 2013 2013 2013 2013 2013 2013 2013 2013 2013 2013 ...
identical(flights[["year"]], flights$year)
#> [1] TRUE
identical(flights[["year"]], flights[, "year", drop = TRUE])
#> [1] TRUE
str(flights$yea)
#> Warning: Unknown or uninitialised column: `yea`.
#>  NULL

If the object resulting from the subsetting operation is two-dimensional, it is returned as data.table object. Apart form this distinction, again the intent is to replicate tibble behavior. One way in which tibble and data.frame do not behave in the same way is in default coercion to lower dimensions. The default value for the drop argument of [.data.frame is FALSE if only one row is returned but changes to TRUE where the result is a single column, while it is always FALSE for tibbles. A difference in behavior between data.table and tibble (any by extension prt) is a missing j argument: in the tibble (and in the data.frame) implementation, the i argument is then interpreted as column specification, whereas for data.frames, i remains a row selection.

datasets::mtcars[, "mpg"]
#>  [1] 21.0 21.0 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2 17.8 16.4 17.3 15.2 10.4
#> [16] 10.4 14.7 32.4 30.4 33.9 21.5 15.5 15.2 13.3 19.2 27.3 26.0 30.4 15.8 19.7
#> [31] 15.0 21.4
flights[, "dep_time"]
#>         dep_time
#>      1:      517
#>      2:      533
#>      3:      542
#>      4:      544
#>      5:      554
#>     ---         
#> 336772:       NA
#> 336773:       NA
#> 336774:       NA
#> 336775:       NA
#> 336776:       NA

jan_dt <- flights[flights$month == 1L, ]

jan_dt[1L]
#>    year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013     1   1      517            515         2      830            819
#>    arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1:        11      UA   1545  N14228    EWR  IAH      227     1400    5     15
#>              time_hour
#> 1: 2013-01-01 05:00:00
flights[1L]
#>         year
#>      1: 2013
#>      2: 2013
#>      3: 2013
#>      4: 2013
#>      5: 2013
#>     ---     
#> 336772: 2013
#> 336773: 2013
#> 336774: 2013
#> 336775: 2013
#> 336776: 2013

Deviation of prt subsetting behavior from that of tibble objects is most likely unintentional and bug reports are much appreciated as github issues.

The main feature of prt is the ability to load only a subset of a much larger tabular dataset and a useful function for selecting rows and columns of a table in a concise manner is the base R S3 generic function subset(). As such, a prt specific method is provided by this package. Using this functionality, above query for selecting all flights in January can be written as follows

identical(jan_dt, subset(flights, month == 1L))
#> [1] TRUE

To illustrate the importance of row-ordering consider the following small benchmark example: we subset on the carrier column, selecting only American Airlines flights. In one prt object, rows are ordered by carrier whereas in the other they are not, which will cause rows that are interleaved with those corresponding to AA flights to be read and discarded.

bench::mark(
  subset(flights, carrier == "AA"),
  subset(by_carrier, carrier == "AA")
)
#> Warning: Some expressions had a GC in every iteration; so filtering is disabled.
#> # A tibble: 2 x 6
#>   expression                             min median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>                          <bch:> <bch:>     <dbl> <bch:byt>    <dbl>
#> 1 subset(flights, carrier == "AA")    92.6ms 97.7ms      9.41    56.5MB     16.9
#> 2 subset(by_carrier, carrier == "AA") 23.9ms 27.5ms     32.9     17.8MB     11.6

A common problem with non-standard evaluation (NSE) is potential ambiguity. Symbols in expressions passed as subset and select arguments are first resolved in the context of the data, followed by the environment the expression was created in (the quosure environment). Expressions are evaluated using rlang::eval_tidy(), which makes possible the distinction between symbols referring to the data mask from those referring to the expression environment. This can either be achieved using the .data and .env pronouns or by forcing parts of the expression.

month <- 1L
subset(flights, month == month, 1L:7L)
#>         year month day dep_time sched_dep_time dep_delay arr_time
#>      1: 2013     1   1      517            515         2      830
#>      2: 2013     1   1      533            529         4      850
#>      3: 2013     1   1      542            540         2      923
#>      4: 2013     1   1      544            545        -1     1004
#>      5: 2013     1   1      554            600        -6      812
#>     ---                                                          
#> 336772: 2013     9  30       NA           1455        NA       NA
#> 336773: 2013     9  30       NA           2200        NA       NA
#> 336774: 2013     9  30       NA           1210        NA       NA
#> 336775: 2013     9  30       NA           1159        NA       NA
#> 336776: 2013     9  30       NA            840        NA       NA

identical(jan_dt, subset(flights, month == !!month))
#> [1] TRUE
identical(jan_dt, subset(flights, .env$month == .data$month))
#> [1] TRUE

While in the above example it is fairly clear what is happening and it should come as no surprise that the symbol month cannot simultaneously refer to a value in the calling environment and the name of a column in the data mask, a more subtle issue is considered in the following example. The environment which takes precedence for evaluating the select argument is a named list of column indices. This makes it possible for example to specify a range of columns as (and makes the behavior of subset() being applied to a prt object consistent with that of a data.frame).

subset(flights, select = year:day)
#>         year month day
#>      1: 2013     1   1
#>      2: 2013     1   1
#>      3: 2013     1   1
#>      4: 2013     1   1
#>      5: 2013     1   1
#>     ---               
#> 336772: 2013     9  30
#> 336773: 2013     9  30
#> 336774: 2013     9  30
#> 336775: 2013     9  30
#> 336776: 2013     9  30

Now recall that symbols that cannot be resolved in this data environment will be looked up in the calling environment. Therefore the following effect, while potentially unintuitive, can easily be explained. Again, the .data and .env pronouns can be used to resolve potential issues.

sched_dep_time <- "dep_time"
colnames(subset(flights, select = sched_dep_time))
#> [1] "sched_dep_time"

actual_dep_time <- "dep_time"
colnames(subset(flights, select = actual_dep_time))
#> [1] "dep_time"

colnames(subset(flights, select = .env$sched_dep_time))
#> [1] "dep_time"
colnames(subset(flights, select = .env$actual_dep_time))
#> [1] "dep_time"
colnames(subset(flights, select = .data$sched_dep_time))
#> [1] "sched_dep_time"
colnames(subset(flights, select = .data$actual_dep_time))
#> Error: Column `actual_dep_time` not found in `.data`

By default, subset expressions have to be evaluated on the entire dataset at once in order to be consistent with base R subset() for data.frames. Often times this is inefficient and this behavior can be modified using the part_saft argument. Consider the following query which selects all rows where the arrival delay is larger than the mean arrival delay. Obviously an expression like this can yield different results depending on whether it is evaluated on individual partitions or over the entire data. Other queries such as the one above where we threshold on a fixed value, however can safely be evaluated on partitions individually.

is_true <- function(x) !is.na(x) & x
expr <- quote(is_true(arr_delay > mean(arr_delay, na.rm = TRUE)))
nrow(subset_quo(flights, expr, part_safe = FALSE))
#> [1] 105827
nrow(subset_quo(flights, expr, part_safe = TRUE))
#> [1] 104752

As an aside, in addition to subset(), which creates quosures from the expressions passed as subset and select, (using rlang::enquo()) the function subset_quo() which operates on already quoted expressions is exported as well. Thanks to the double curly brace forwarding operator introduced in rlang 0.4.0, this escape-hatch mechanism however is of lesser importance.

col_safe_subset <- function(x, expr, cols) {
  stopifnot(is_prt(x), is.character(cols))
  subset(x, {{ expr }}, .env$cols)
}

air_time <- c("dep_time", "arr_time")
col_safe_subset(flights, month == 1L, air_time)
#>        dep_time arr_time
#>     1:      517      830
#>     2:      533      850
#>     3:      542      923
#>     4:      544     1004
#>     5:      554      812
#>    ---                  
#> 27000:       NA       NA
#> 27001:       NA       NA
#> 27002:       NA       NA
#> 27003:       NA       NA
#> 27004:       NA       NA

In addition to subsetting, concise and informative printing is another area which effort ha been put into. Inspired by (and liberally borrowing code from) tibble, the print() method of fst objects adds the data.table approach of showing both the first and last n rows of the table in question. This functionality can be used by other classes used to represent tabular data, as the function trunc_dt() driving this is exported. All that is required are implementations of the base S3 generic functions dim(), head(), tail() and of course print().

new_tbl <- function(...) structure(list(...), class = "my_tbl")

dim.my_tbl <- function(x) {
  rows <- unique(lengths(x))
  stopifnot(length(rows) == 1L)
  c(rows, length(x))
}
head.my_tbl <- function(x, n = 6L, ...) {
  as.data.frame(lapply(x, `[`, seq_len(n)))
}
tail.my_tbl <- function(x, n = 6L, ...) {
  as.data.frame(lapply(x, `[`, seq(nrow(x) - n + 1L, nrow(x))))
}
print.my_tbl <- function(x, ..., n = NULL, width = NULL, n_extra = NULL) {
  out <- format(trunc_dt(x, n = n, width = width, n_extra = n_extra))
  cat(paste0(out, "\n"), sep = "")
  invisible(x)
}
new_tbl(a = letters, b = 1:26)
#> # Description: my_tbl
#>    a         b
#>    <chr> <int>
#>  1 a         1
#>  2 b         2
#>  3 c         3
#>  4 d         4
#>  5 e         5
#>  …  
#> 22 v        22
#> 23 w        23
#> 24 x        24
#> 25 y        25
#> 26 z        26
#> # … with 16 more rows

Similarly, the function glimpse_dt() which can be used to implement a class-specific function for the tibble S3 generic tibble::glimpse(). In order to customize the text description of the object a class-specific function for the tibble S3 generic tibble::tbl_sum() can be provided.

unlink(tmp, recursive = TRUE)