From 04b86971f988df44e35b65bdba8b5d1c5e76e478 Mon Sep 17 00:00:00 2001 From: evalparse Date: Sun, 15 Dec 2019 03:39:20 +1100 Subject: [PATCH 1/6] up --- R/map.r | 2 - R/two-stage-verbs.R | 2 +- book/01-intro.Rmd | 5 +- book/02-intro-disk-frame.Rmd | 119 ++++---------- book/03-concepts.Rmd | 17 +- book/04-ingesting-data.Rmd | 11 +- book/06-vs-dask-juliadb.Rmd | 18 +-- book/08-more-epic.Rmd | 14 +- book/10-group-by.Rmd | 301 +++++++++++++++++++++++++++++++++-- man/map.Rd | 3 - 10 files changed, 360 insertions(+), 132 deletions(-) diff --git a/R/map.r b/R/map.r index cc6f0c5d..60a09ea5 100644 --- a/R/map.r +++ b/R/map.r @@ -164,7 +164,6 @@ map_dfr.disk.frame <- function(.x, .f, ..., .id = NULL, use.names = fill, fill = #' #' # clean up cars.df #' delete(cars.df) -#' @rdname map imap <- function(.x, .f, ...) { UseMethod("imap") } @@ -269,7 +268,6 @@ delayed <- function(.x, .f, ...) { } #' @export -#' @rdname map delayed.disk.frame <- function(.x, .f, ...) { map.disk.frame(.x, .f, ..., lazy = TRUE) } diff --git a/R/two-stage-verbs.R b/R/two-stage-verbs.R index d90719f5..3a93af53 100644 --- a/R/two-stage-verbs.R +++ b/R/two-stage-verbs.R @@ -163,7 +163,7 @@ IQR.collected_agg.disk.frame <- function(listx, ...) { #' A function to parse the summarize function #' @importFrom dplyr filter select pull -#' @imporFrom purr map_dfr +#' @importFrom purrr map_dfr #' @export summarise.grouped_disk.frame <- function(.data, ...) { code = substitute(list(...))[-1] diff --git a/book/01-intro.Rmd b/book/01-intro.Rmd index 7ea73dd4..d0df5a20 100644 --- a/book/01-intro.Rmd +++ b/book/01-intro.Rmd @@ -8,9 +8,10 @@ vignette: > %\VignetteEncoding{UTF-8} --- -# The story of how `disk.frame` came to be +# The story of how `{disk.frame}` came to be I was working at one of Australia's biggest banks and their shiny new SAS server was experiencing huge instability issues. As a result, we had to run SAS on our laptops to perform huge amounts of data manipulation. A simple SQL query can take around 20 minutes. I had enough. -That's why I created `disk.frame` - a larger-than-RAM data manipulation framework for R. The same query now only takes 10 seconds. \ No newline at end of file +That's why I created `disk.frame` - a larger-than-RAM data manipulation framework for R. The same query now only takes 10 seconds. + diff --git a/book/02-intro-disk-frame.Rmd b/book/02-intro-disk-frame.Rmd index 2f101f45..26800779 100644 --- a/book/02-intro-disk-frame.Rmd +++ b/book/02-intro-disk-frame.Rmd @@ -152,7 +152,7 @@ The class of `flights.df1` is also a `disk.frame` after the `dplyr::select` tr For lazily constructed `disk.frame`s (e.g. `flights.df1`). The function `collect` can be used to bring the results from disk into R, e.g. ```{r, dependson='dfselect'} -collect(flights.df1) %>% head +collect(flights.df1) %>% head(2) ``` Of course, for larger-than-RAM datasets, one wouldn't call `collect` on the whole `disk.frame` (because why would you need `disk.frame` otherwise). More likely, one would call `collect` on a `filter`ed dataset or one summarized with `group_by`. @@ -160,11 +160,11 @@ Of course, for larger-than-RAM datasets, one wouldn't call `collect` on the whol Some examples of other dplyr verbs applied: ```{r, dependson='asdiskframe'} -filter(flights.df, dep_delay > 1000) %>% collect %>% head +filter(flights.df, dep_delay > 1000) %>% collect %>% head(2) ``` ```{r, dependson='asdiskframe'} -mutate(flights.df, speed = distance / air_time * 60) %>% collect %>% head +mutate(flights.df, speed = distance / air_time * 60) %>% collect %>% head(2) ``` ### Examples of NOT fully supported `dplyr` verbs @@ -173,7 +173,7 @@ The `chunk_arrange` function arranges (sorts) each chunk but not the whole datas ```{r, dependson='asdiskframe'} # this only sorts within each chunk -chunk_arrange(flights.df, dplyr::desc(dep_delay)) %>% collect %>% head +chunk_arrange(flights.df, dplyr::desc(dep_delay)) %>% collect %>% head(2) ``` @@ -204,7 +204,9 @@ rename filter chunk_arrange # within each chunk chunk_group_by # within each chunk -chunk_summarise/chunk_summarize # within each chunk +chunk_summarize # within each chunk +group_by # limited functions +summarize # limited functions mutate transmute left_join @@ -213,93 +215,25 @@ full_join # careful. Performance! semi_join anit_join ``` + ## Sharding and distribution of chunks Like other distributed data manipulation frameworks `disk.frame` utilizes the *sharding* concept to distribute the data into chunks. For example "to shard by `cust_id`" means that all rows with the same `cust_id` will be stored in the same chunk. This enables `chunk_group_by` by `cust_id` to produce the same results as non-chunked data. The `by` variables that were used to shard the dataset are called the `shardkey`s. The *sharding* is performed by computing a deterministic hash on the shard keys (the `by` variables) for each row. The hash function produces an integer between `1` and `n`, where `n` is the number of chunks. -## Grouping - -The `disk.frame` implements the `chunk_group_by` operation with a significant caveat. In the `disk.frame` framework, group-by happens WITHIN each chunk and not ACROSS chunks. To achieve group by across chunk we need to put **all rows with the same group keys into the same file chunk**; this can be achieved with `hard_group_by`. However, the `hard_group_by` operation can be **VERY TIME CONSUMING** computationally and should be **avoided** if possible. - -The `hard_group_by` operation is best illustrated with an example, suppose a `disk.frame` has three chunks -``` -# chunk1 = 1.fst -# id n -#1 a 1 -#2 a 2 -#3 b 3 -#4 d 4 - -# chunk2 = 2.fst -# id n -#1 a 4 -#2 a 5 -#3 b 6 -#4 d 7 - -# chunk3 = 3.fst -# id n -#1 a 4 -#2 b 5 -#3 c 6 -``` -and notice that the `id` column contains 3 distinct values `"a"`,`"b"`, and `"c"`. To perform `hard_group_by(df, by = id)` MAY give you the following `disk.frame` where all the `id`s with the same values end up in the same chunks. - -``` -# chunk1 = 1.fst -# id n -#1 b 3 -#2 b 6 - -# chunk2 = 2.fst -# id n -#1 c 6 -#2 d 4 -#3 d 7 - -# chunk3 = 3.fst -# id n -#1 a 1 -#2 a 2 -#3 a 4 -#4 a 5 -#5 a 4 -``` - -Also, notice that there is no guaranteed order for the distribution of the `id`s to the chunks. The order is random, but each chunk is likely to have a similar number of rows, provided that `id` does not follow a skewed distribution i.e. where a few distinct values make up the majority of the rows. +## Group-by -Typically, `chunk_group_by` is performed WITHIN each chunk. This is not an issue if the chunks have already been sharded on the `by` variables beforehand; however, if this is not the case then one may need a second stage aggregation to obtain the correct result, see *Two-stage group by*. - -By forcing the user to choose `chunk_group_by` (within each chunk) and `hard_group_by` (across all chunks), this ensures that the user is conscious of the choice they are making. In `sparklyr` the equivalent of a `hard_group_by` is performed, which we should avoid, where possible, as it is time-consuming and expensive. Hence, `disk.frame` has chosen to explain the theory and allow the user to make a conscious choice when performing `group_by`. +`{disk.frame}` implements the `group_by` operation some caveats. In the `{disk.frame}` framework, only a set functions are supported in `summarize`. However, the user can create more custom `group-by` functions can be defined. For more information see [group-by](10-group-by.Rmd) ```{r, dependson='asdiskframe'} flights.df %>% - hard_group_by(carrier) %>% # notice that hard_group_by needs to be set - chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules + group_by(carrier) %>% # notice that hard_group_by needs to be set + summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules collect %>% arrange(carrier) ``` -### Two-stage group by -For most group-by tasks, the user can achieve the desired result WITHOUT using `hard = TRUE` by performing the group by in two stages. For example, suppose you aim to count the number of rows group by `carrier`, you can set `hard = F` to find the count within each chunk and then use a second group-by to summaries each chunk's results into the desired result. For example, - -```{r, dependson='asdiskframe'} -flights.df %>% - chunk_group_by(carrier) %>% # `chunk_group_by` aggregates within each chunk - chunk_summarize(count = n()) %>% # mean follows normal R rules - collect %>% # collect each individul chunks results and row-bind into a data.table - group_by(carrier) %>% - summarize(count = sum(count)) %>% - arrange(carrier) -``` - -Because this two-stage approach avoids the expensive `hard group_by` operation, it is often significantly faster. However, it can be tedious to write; and this is a con of the `disk.frame` chunking mechanism. - -*Note*: this two-stage approach is similar to a map-reduce operation. - - ## Restrict input columns for faster processing One can restrict which input columns to load into memory for each chunk; this can significantly increase the speed of data processing. To restrict the input columns, use the `srckeep` function which only accepts column names as a string vector. @@ -307,8 +241,8 @@ One can restrict which input columns to load into memory for each chunk; this ca ```{r, dependson='asdiskframe'} flights.df %>% srckeep(c("carrier","dep_delay")) %>% - hard_group_by(carrier) %>% - chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules + group_by(carrier) %>% + summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules collect ``` @@ -352,21 +286,35 @@ flights.df %>% ## Window functions and arbitrary functions -`disk.frame` supports all `data.frame` operations, unlike Spark which can only perform those operations that Spark has implemented. Hence windowing functions like `rank` are supported out of the box. +`{disk.frame}` supports all `data.frame` operations, unlike Spark which can only perform those operations that Spark has implemented. Hence windowing functions like `min_rank` and `rank` are supported out of the box. + +For the following example, we will use the `hard_group_by` which performs a group-by and also reorganises the chunks so that all records with the same `year`, `month`, and `day` end up in the same chunk. This is typically not adviced, as `hard_group_by` can be slow for large datasets. ```{r, dependson='asdiskframe'} # Find the most and least delayed flight each day bestworst <- flights.df %>% srckeep(c("year","month","day", "dep_delay")) %>% - chunk_group_by(year, month, day) %>% - select(dep_delay) %>% + hard_group_by(c("year", "month", "day")) %>% filter(dep_delay == min(dep_delay, na.rm = T) || dep_delay == max(dep_delay, na.rm = T)) %>% collect - bestworst %>% head ``` +another example + +```{r, dependson='asdiskframe'} +ranked <- flights.df %>% + srckeep(c("year","month","day", "dep_delay")) %>% + hard_group_by(c("year", "month", "day")) %>% + filter(min_rank(desc(dep_delay)) <= 2 & dep_delay > 0) %>% + collect + +ranked %>% head +``` + +one more example + ```{r, dependson='asdiskframe'} # Rank each flight within a daily ranked <- flights.df %>% @@ -381,6 +329,7 @@ ranked %>% head ## Arbitrary by-chunk processing + One can apply arbitrary transformations to each chunk of the `disk.frame` by using the `delayed` function which evaluates lazily or the `map.disk.frame(lazy = F)` function which evaluates eagerly. For example to return the number of rows in each chunk ```{r, dependson='asdiskframe'} @@ -401,7 +350,7 @@ flights.df2 <- map(flights.df, ~.x[1:10,], lazy = F, outdir = file.path(tempdir( flights.df2 %>% head ``` -Notice `disk.frame` supports the `purrr` syntax for defining a function using `~`. +Notice `{disk.frame}` supports the `purrr` syntax for defining a function using `~`. ## Sampling diff --git a/book/03-concepts.Rmd b/book/03-concepts.Rmd index d5940f59..8232f06e 100644 --- a/book/03-concepts.Rmd +++ b/book/03-concepts.Rmd @@ -1,5 +1,5 @@ --- -title: "Key disk.frame concepts" +title: "Key `{disk.frame}` concepts" author: "ZJ" output: rmarkdown::html_vignette vignette: > @@ -15,15 +15,18 @@ knitr::opts_chunk$set( ) ``` +# Key `{disk.frame}` concepts There are a number of concepts and terminologies that are useful to understand in order to use `disk.frame` effectively. ## What is a `disk.frame` and what are chunks? -A `disk.frame` is nothing more a folder and in that folder there should be [`fst`](https://www.fstpackage.org/) files named "1.fst", "2.fst", "3.fst" etc. Each of the ".fst" file is called a _chunk_. + +A `disk.frame` is a folder containing [`fst`](https://www.fstpackage.org/) files named "1.fst", "2.fst", "3.fst" etc. Each of the ".fst" file is called a _chunk_. ## Workers and parallelism + Parallelism in `disk.frame` is achieved using the [`future` package](https://cran.r-project.org/package=future). When performing many tasks, `disk.frame` uses multiple workers, where each _worker_ is an R session, to perform the tasks in parallel. -It is recommended that you should running these to set-up immediately after you `library(disk.frame)`. For example: +It is recommended that you should run the following immediately after `library(disk.frame)` to set-up multiple workers. For example: ```r library(disk.frame) @@ -55,4 +58,12 @@ To see how many workers are at work, use future::nbrOfWorkers() ``` +## How `{disk.frame}` works + +When `df %>% some_fn %>% collect` is callled. The `some_fn` is applied to each chunk of `df`. The collect will row-bind the results from `some_fn(chunk)`together if the returned value of `some_fn` is a data.frame, or it will return a `list` containing the results of `some_fn`. + +The session that receives these results is called the **main session**. In general, we should try to minimise the amount of data passed from the worker sessions back to the main session, because passing data around can be slow. + +Also, please note that there is no communication between the workers, except for workers passing data back to the main session. + diff --git a/book/04-ingesting-data.Rmd b/book/04-ingesting-data.Rmd index d26c433a..541d870a 100644 --- a/book/04-ingesting-data.Rmd +++ b/book/04-ingesting-data.Rmd @@ -15,7 +15,12 @@ knitr::opts_chunk$set( ) ``` -Let's set-up `disk.frame` +# Ingesting Data + +One of the most important tasks to perform before using the `{disk.frame}` package is to make some `disk.frame`s! There are a few functions to help you do that. Before we do that, we set up the `{disk.frame}` as usual + +**Setting up** + ```r library(disk.frame) @@ -31,10 +36,8 @@ if(interactive()) { ``` -One of the most important tasks to perform before using the `disk.frame` package is to make some `disk.frame`s! There are a few functions to help you do that. - ## Convert a `data.frame` to `disk.frame` -Firstly there is `as.disk.frame()` which allows you to make a `disk.frame` from a `data.frame`, e.g. +Firstly, there is `as.disk.frame()` which allows you to make a `disk.frame` from a `data.frame`, e.g. ```r flights.df = as.disk.frame(nycflights13::flights) diff --git a/book/06-vs-dask-juliadb.Rmd b/book/06-vs-dask-juliadb.Rmd index d3376f47..15ffd10d 100644 --- a/book/06-vs-dask-juliadb.Rmd +++ b/book/06-vs-dask-juliadb.Rmd @@ -15,7 +15,7 @@ knitr::opts_chunk$set( ) ``` -## Intro +# Intro - Benchmark 1 This is the first in a series to benchmark the performance of disk.frame vs other medium-data tools. For Python, we will benchmark Dask, and for Julia, we will benchmark JuliaDB.jl. In the process, I will do a warts-and-all account of the tools I have tested. @@ -53,7 +53,7 @@ library(ggplot2) ggplot(df) + geom_bar(aes(x = tool, weight = timing), stat = "count") + ylab("seconds") + - ggtitle("Count(*) gorup-by timings") + ggtitle("Count(*) group-by timings") ``` ## Data @@ -110,19 +110,17 @@ We can inspect the result as well. summ ``` -Another way to perform the analysis is to use `dplyr` syntax to perform a two-stage "group-by" which is: +Another way to perform the analysis is to use `dplyr` syntax to perform group-by in _one-stage_ which is: -```r -df1 %>% +```{r, dependson='convert'} +system.time(df1 %>% srckeep("V1") %>% - chunk_group_by(V1) %>% - chunk_summarise(N = n()) %>% - collect %>% group_by(V1) %>% - summarise(N = sum(N)) %>% + summarise(N = n()) %>% + collect) ``` -However, the `dplyr` syntax tends to be slightly slower than using data.table syntax. +However, the `dplyr` syntax tends to be slightly slower than using data.table syntax. This may be improved as much of the overhead is due to inefficient use of NSE. #### Dask diff --git a/book/08-more-epic.Rmd b/book/08-more-epic.Rmd index 2b2ee631..0c5e86e0 100644 --- a/book/08-more-epic.Rmd +++ b/book/08-more-epic.Rmd @@ -201,7 +201,7 @@ ggplot(data.frame( ## Can {disk.frame} be even more "epic"? -Well yes! We can actually speed up the group-by operation that Bruno did by using `srckeep`. The use of `srckeep` can't be emphasized enough! It works by reading from disk only the columns needed for the analysis, and hence disk IO time is (drastically) reduced! However, we do have to live with the two-stage group-by annoyance for now. +Well yes! We can actually speed up the group-by operation that Bruno did by using `srckeep`. The use of `srckeep` can't be emphasized enough! It works by reading from disk only the columns needed for the analysis, and hence disk IO time is (drastically) reduced! ```r tic = Sys.time() @@ -209,11 +209,8 @@ tic = Sys.time() # doing group-by in two-stages which is annoying; I am working on something better mean_dep_delay <- flights.df %>% srckeep(c("YEAR", "MONTH", "DAY_OF_MONTH", "DEP_DELAY")) %>% - chunk_group_by(YEAR, MONTH, DAY_OF_MONTH) %>% - chunk_summarise(sum_delay = sum(DEP_DELAY, na.rm = TRUE), n = n()) %>% - collect() %>% group_by(YEAR, MONTH, DAY_OF_MONTH) %>% - summarise(mean_delay = sum(sum_delay)/sum(n)) + summarise(mean_delay = mean(DEP_DELAY, na.rm = TRUE)) (toc = Sys.time() - tic) #> Time difference of 2.800005 secs ``` @@ -223,11 +220,8 @@ Compare the above the to timing without `srckeep` ```r tic = Sys.time() mean_dep_delay <- flights.df %>% - chunk_group_by(YEAR, MONTH, DAY_OF_MONTH) %>% - chunk_summarise(sum_delay = sum(DEP_DELAY, na.rm = TRUE), n = n()) %>% - collect() %>% group_by(YEAR, MONTH, DAY_OF_MONTH) %>% - summarise(mean_delay = sum(sum_delay)/sum(n)) + summarise(mean_delay = mean(DEP_DELAY, na.rm = TRUE)) (toc = Sys.time() - tic) #> Time difference of 15.62312 secs ``` @@ -248,7 +242,7 @@ ggplot2::ggplot(data1) + So there you go! {disk.frame} can be even more "epic"! Here are the two main take-aways 1. Load CSV files as many individual files if possible to take advantage of multi-core parallelism -2. `srckeep` is your friend! Disk IO is often the bottleneck in data manipulation, and you can reduce Disk IO by specifying only columns that you will use with `srckeep(c(columns1, columns2, ...))`. +2. `srckeep` is your friend! Disk IO is often the bottleneck in data manipulation, and you can reduce disk IO by specifying only columns that you will use with `srckeep(c(columns1, columns2, ...))`. ## Advertisements diff --git a/book/10-group-by.Rmd b/book/10-group-by.Rmd index 93eecb08..618a7745 100644 --- a/book/10-group-by.Rmd +++ b/book/10-group-by.Rmd @@ -15,8 +15,23 @@ knitr::opts_chunk$set( ) ``` -### Group by -Starting from {disk.frame} v0.2.2, there is for support `group_by` for a limited set of functions. For example: +# Group-by in `{disk.frame}` + +The group-by framework of [`{disk.frame}`](https://diskframe.com) has been overhauled in v0.2.2. It is now able to perform some group-by-summarize operations in one stage. In this chapter we will cover + +1. How to use one-stage group-by +2. Manual Two-stage group and hard group-by +3. The architecture of `{disk.frame}` and its implications for group-by +4. How to define custom one-stage group-by functions and its limitatons + + +## One-stage Group-by + +A one-stage group-by is the same as group-by for data.frames. This would be remarkable, if not for the limitaions imposed by the disk-based nature of `{disk.frame}`. Before v0.2.2 of `{disk.frame}`, one-stage group-by was not possible, and the users had to rely to two-stage group-by even for simple operations like `mean`. + +However, now that one-stage group-by is possible, there are still limiations and not all functions are supported out-of-the-box. Hence, at the end of the chapter we have described [how to define custom one-stage group-by functions](## Custom one-stage group-by). + +An example of one-stage group-by: ```r result_from_disk.frame = iris %>% @@ -35,11 +50,11 @@ result_from_disk.frame = iris %>% collect ``` -The results should be exactly the same as if applying the same group-by operations on a data.frame. If not then please [report a bug](https://github.com/xiaodaigh/disk.frame/issues). +It is important to note that not all functions that can run in `data.frame` `summarize` would work automatically. This is because of how `{disk.frame}` works. Please see the secion on [defining your own one-stage-group-by](### Defining your own one-stage group-by) if you wish to learn how to define your own one-stage group-by functions. -#### List of supported group-by functions +### List of supported group-by functions -If a function you like is missing, please make a feature request [here](https://github.com/xiaodaigh/disk.frame/issues). It is a limitation that function that depend on the order a column can only obtained using estimated methods. +If a function you need/like is missing, please make a feature request [here](https://github.com/xiaodaigh/disk.frame/issues). It is a limitation that function that depend on the order a column can only obtained using estimated methods. | Function | Exact/Estimate | Notes | | -- | -- | -- | @@ -58,11 +73,95 @@ If a function you like is missing, please make a feature request [here](https:// | `quantile` | Estimate | One quantile only | | `IQR` | Estimate | | -### Two-Stage Group by -Given the list of group-by functions is limited, so {disk.frame} supports a two-stage style grouping, enable maximum flexibility. The key is understand that `chunk_group_by` performs `group-by` within each chunk. +### Notes on One-Stage group-by + +The results should be exactly the same as if applying the same group-by operations on a `data.frame`. If not then please [report a bug](https://github.com/xiaodaigh/disk.frame/issues). + + +## Group-by notes + +The `disk.frame` implements the `chunk_group_by` operation with a significant caveat. In the `disk.frame` framework, group-by happens WITHIN each chunk and not ACROSS chunks. To achieve group by across chunk we need to put **all rows with the same group keys into the same file chunk**; this can be achieved with `hard_group_by`. However, the `hard_group_by` operation can be **VERY TIME CONSUMING** computationally and should be **avoided** if possible. + +The `hard_group_by` operation is best illustrated with an example, suppose a `disk.frame` has three chunks +``` +# chunk1 = 1.fst +# id n +#1 a 1 +#2 a 2 +#3 b 3 +#4 d 4 + +# chunk2 = 2.fst +# id n +#1 a 4 +#2 a 5 +#3 b 6 +#4 d 7 + +# chunk3 = 3.fst +# id n +#1 a 4 +#2 b 5 +#3 c 6 +``` +and notice that the `id` column contains 3 distinct values `"a"`,`"b"`, and `"c"`. To perform `hard_group_by(df, by = id)` MAY give you the following `disk.frame` where all the `id`s with the same values end up in the same chunks. + +``` +# chunk1 = 1.fst +# id n +#1 b 3 +#2 b 6 + +# chunk2 = 2.fst +# id n +#1 c 6 +#2 d 4 +#3 d 7 + +# chunk3 = 3.fst +# id n +#1 a 1 +#2 a 2 +#3 a 4 +#4 a 5 +#5 a 4 +``` + +Also, notice that there is no guaranteed order for the distribution of the `id`s to the chunks. The order is random, but each chunk is likely to have a similar number of rows, provided that `id` does not follow a skewed distribution i.e. where a few distinct values make up the majority of the rows. + +Typically, `chunk_group_by` is performed WITHIN each chunk. This is not an issue if the chunks have already been sharded on the `by` variables beforehand; however, if this is not the case then one may need a second stage aggregation to obtain the correct result, see *Two-stage group by*. + +By forcing the user to choose `chunk_group_by` (within each chunk) and `hard_group_by` (across all chunks), this ensures that the user is conscious of the choice they are making. In `sparklyr` the equivalent of a `hard_group_by` is performed, which we should avoid, where possible, as it is time-consuming and expensive. Hence, `disk.frame` has chosen to explain the theory and allow the user to make a conscious choice when performing `group_by`. + +```{r, dependson='asdiskframe'} +flights.df %>% + hard_group_by(carrier) %>% # notice that hard_group_by needs to be set + chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules + collect %>% + arrange(carrier) +``` + +## Two-Stage Group-by + +Prior to `{disk.frame}` v0.2.2, there is no general support for one-stage group-by. Hence a two-stage style group-by is needed. The key is understand is the `chunk_group_by` which performs `group-by` within each chunk. + +For most group-by tasks, the user can achieve the desired result WITHOUT using `hard = TRUE` by performing the group by in two stages. For example, suppose you aim to count the number of rows group by `carrier`, you can set `hard = F` to find the count within each chunk and then use a second group-by to summaries each chunk's results into the desired result. For example, + +```{r, dependson='asdiskframe'} +flights.df %>% + chunk_group_by(carrier) %>% # `chunk_group_by` aggregates within each chunk + chunk_summarize(count = n()) %>% # mean follows normal R rules + collect %>% # collect each individul chunks results and row-bind into a data.table + group_by(carrier) %>% + summarize(count = sum(count)) %>% + arrange(carrier) +``` + +Because this two-stage approach avoids the expensive `hard group_by` operation, it is often significantly faster. However, it can be tedious to write; and this is a con of the `disk.frame` chunking mechanism. + +*Note*: this two-stage approach is similar to a map-reduce operation. ```{r setup, cache=TRUE} -knitr::opts_chunk$set(include = FALSE) suppressPackageStartupMessages(library(disk.frame)) setup_disk.frame() ``` @@ -101,8 +200,9 @@ cat("filtering a < 0.1 took: ", data.table::timetaken(pt), "\n") nrow(df_filtered) ``` -### Hard group by -Another way to perform a one-stage `group_by` is to perform a `hard_group_by` on a `disk.frame. This will rechunk the `disk.frame` by the by columns. This is **not** recommended for performance reasons, as it can quite slow to rechunk the chunks on disk. +## Hard group-by + +Another way to perform a one-stage `group_by` is to perform a `hard_group_by` on a `disk.frame`. This will rechunk the `disk.frame` by the by-columns. This is **not** recommended for performance reasons, as it can be quite slow to rechunk the file chunks on disk. ```{r} pt = proc.time() res1 <- flights.df %>% @@ -112,14 +212,191 @@ res1 <- flights.df %>% hard_group_by(qtr) %>% # hard group_by is MUCH SLOWER but avoid a 2nd stage aggregation chunk_summarise(avg_delay = mean(dep_delay, na.rm = TRUE)) %>% collect -cat("group by took: ", data.table::timetaken(pt), "\n") +cat("group-by took: ", data.table::timetaken(pt), "\n") collect(res1) ``` + +## Custom one-stage group-by + +### At a glance + +`{disk.frame}` allows the user to enable create custom one-stage group-by functions. To make a function `fn` one stage. One needs to define two functions + +1. `fn.chunk_agg.disk.frame` which applies the itself to each chunk +2. `fn.collected_agg.disk.frame` which accpets a `list` of returns from `fn.chunk_agg.disk.frame` and finalize the computation. + +For example, to make `mean` a one-stage group-by function, `{disk.frame}` has defined `fn.chunk_agg.disk.frame` and `fn.collected_agg.disk.frame`, which we will illustrate with examples below. + +But first, we shall explain some theory behind `{disk.frame}` to help you better understand "why does `{disk.frame}` do it like that?". + +### How does `{disk.frame}` work + +One may ask, how come only a few functions are supported for one-stage group-by? And why are some functions like `median` only produce estimates instead of producing the exact figure? To answer these question, we need to have an understanding of how `{disk.frame}` works. + +A `disk.frame` is organized as chunks stored on disk. Each chunk is a file stored in [fst format](https://www.fstpackage.org/). The [`{future}` package](https://cran.r-project.org/web/packages/future/index.html) is used to apply the same function to each chunk, each of these operations are carried out in a separate R session. These R sessions cannot communicate with each other during the execution of the operations. + +Once the operation has been performed the results will be bought back to the session from which the operation was called. This is the only point of interprocess communication. The process of making group-by in one stage does require some additional work. + +To summarize, the two phases of a `df %>% some_fn %>% collect` operation is + +1. The `some_fn` is applied to each chunk, and the result is assumed to be a data.frame +2. `collect` then row-binds (`rbind`/`bind_rows`/`rbindlist`) the results together to form a data.frame in the main session + +### How group-by works + +Except for passing the result back to the main session, communication between worker sessions are not allowed. This limits how group-by operations can be performed, hence why group-by can be done in two stages for many functions. However, R's meta-programming abilities allows us to rewrite code to that automatically perform the two-stage group-bys. For example, consider: + +```r +df %>% + group_by(grp1) %>% + summarize(sum(x)) %>% + collect +``` + +we can use meta-programming to transform that to + +```r +df %>% + chunk_group_by(grp1) %>% + chunk_summarize(__tmp1__= sum(x)) %>% + collect() %>% + group_by(grp1) %>% + summarize(x = sum(__tmp1__)) +``` + +Basically, we are "compiling" one-stage group-by code to two-stage group-by code, and then executing it. + +For `mean`, it's trickier, as one needs to keep track on the numerator and the denominator separately in computing `mean(x) = sum(x)/length(x)`. + +Therefore, `{disk.frame}` compiles + +```r +df %>% + group_by(grp1) %>% + summarize(meanx = mean(x)) %>% + collect +``` + +to + +```r +df %>% + chunk_group_by(grp1) %>% + chunk_summarize(__tmp1__ = list(mean.chunk_agg.disk.frame(x))) %>% + collect %>% + group_by(grp1) %>% + chunk_summarize(meanx = mean.chunk_agg.disk.frame(__tmp1__)) +``` + +where `mean.chunk_agg.disk.frame` defines what needs to be done to each chunk, as you can see, the return value is a vector where the elements are named `sumx` and `lengthx`. Here is an example implementation of `mean.chunk_agg.disk.frame` + +```r +mean.chunk_agg.disk.frame <- function(x, na.rm = FALSE, ...) { + sumx = sum(x, na.rm = na.rm) + lengthx = length(x) - ifelse(na.rm, sum(is.na(x)), 0) + c(sumx = sumx, lengthx = lengthx) +} + +``` + +because the return value is not a scalar, we need to write it in a `list` (line 3). + +The `mean.collected_agg.disk.frame` receives a list of outputs from `mean.chunk_agg.disk.frame`. Recall that `mean.chunk_agg.disk.frame` returns a vector for each chunk, so the input to `mean.collected_agg.disk.frame` is a *list of vectors* + +```r +mean.collected_agg.disk.frame <- function(listx) { + sum(sapply(listx, function(x) x["sumx"]))/sum(sapply(listx, function(x) x["lengthx"])) +} +``` + +### How to define your own one-stage group-by function + +Now that we have seen two examples, namely `sum` and `mean`, we are ready summarize how group-by functions are implemented. + +Given the below + +```r +df %>% + group_by(grp1) %>% + summarize(namex = fn(x)) %>% + collect +``` + +`{disk.frame}` compiles it to + +```r +df %>% + chunk_group_by(grp1) %>% + chunk_summarize(__tmp1__ = list(fn.chunk_agg.disk.frame(x))) %>% + collect %>% + group_by(grp1) %>% + chunk_summarize(namex = fn.chunk_agg.disk.frame(__tmp1__)) +``` + +Based on the above information, to make `fn` a one-stage group-by function, the user has to + +1. Define `fn.chunk_agg.disk.frame` which is a function to be applied at each chunk +2. Define `fn.collected_agg.disk.frame` which is a function to be applied to *a `list` containing the returns from `fn.chunk_agg.disk.frame` applied on each chunk* + +**Example of implementing `sum`**: + +1. Define `sum.chunk_agg.disk.frame` + +```r +sum.chunk_agg.disk.frame <- function(x, na.rm = FALSE) { + sum(x, na.rm=na.rm) +} +``` + +2. Define `sum.collected_agg.disk.frame`, which needs to accept a list of `sum(x, na.rm)`, but `sum(x, na.rm)` is just a numeric, so + +```r +sum.collected_agg.disk.frame <- function(list_sum) { + sum(unlist(list_sum)) +} +``` + +**Example of implementing `n_distinct`**: + +The `n_distinct` function counts the number of distint values from a vector `x` + +1. Define `n_distinct.chunk_agg.disk.frame`, to return a list of unique values. Because the same value can appear in multiple chunks, so to ensure that we don't double count, we simply return all the unique values from each chunk which is then deduplicated in the next phase + +```r +n_distinct.chunk_agg.disk.frame <- function(x, na.rm = FALSE) { + if(na.rm) { + setdiff(unique(x), NA) + } else { + unique(x) + } +} +``` + +2. Define `n_distinct.collected_agg.disk.frame`, which deduplicates the unique values + +```r +n_distinct.collected_agg.disk.frame <- function(list_of_chunkwise_uniques) { + dplyr::n_distinct(unlist(list_of_chunkwise_uniques)) +} +``` + +### Limitations + +We have seen that `{disk.frame}` performs operations in two phases + +1. apply the same function to each chunk +2. row-bind the results + +and there are no communication between the sessions that applies the functions at chunk level. + +Hence, it is generally difficult to compute rank based summarizations like `median` exactly. Hence most rank based calculations are estimates only. This is also true of distributed data system like Spark whose median function is also estimates only. + + ## Advertisements -### Interested in learning {disk.frame} in a structured course? +### Interested in learning `{disk.frame}` in a structured course? Please register your interest at: diff --git a/man/map.Rd b/man/map.Rd index 242a3bdc..018ec71f 100644 --- a/man/map.Rd +++ b/man/map.Rd @@ -15,7 +15,6 @@ \alias{lazy} \alias{lazy.disk.frame} \alias{delayed} -\alias{delayed.disk.frame} \alias{chunk_lapply} \title{Apply the same function to all chunks} \usage{ @@ -69,8 +68,6 @@ lazy(.x, .f, ...) delayed(.x, .f, ...) -\method{delayed}{disk.frame}(.x, .f, ...) - chunk_lapply(...) } \arguments{ From 6bbf4c6d5c1b621d03888c794ff39566c80439f9 Mon Sep 17 00:00:00 2001 From: evalparse Date: Sun, 15 Dec 2019 03:44:39 +1100 Subject: [PATCH 2/6] update site --- R/dplyr_verbs.r | 65 +++++++++++++++++++- docs/404.html | 2 +- docs/LICENSE-text.html | 2 +- docs/articles/concepts.html | 28 +++++++-- docs/articles/convenience-features.html | 2 +- docs/articles/data-table-syntax.html | 2 +- docs/articles/glm.html | 2 +- docs/articles/index.html | 4 +- docs/authors.html | 2 +- docs/index.html | 2 +- docs/reference/add_chunk.html | 10 +-- docs/reference/as.data.frame.disk.frame.html | 2 +- docs/reference/as.data.table.disk.frame.html | 2 +- docs/reference/as.disk.frame.html | 2 +- docs/reference/collect.html | 2 +- docs/reference/colnames.html | 2 +- docs/reference/compute.disk.frame.html | 2 +- docs/reference/create_dplyr_mapper.html | 2 +- docs/reference/csv_to_disk.frame.html | 2 +- docs/reference/delete.html | 2 +- docs/reference/df_ram_size.html | 2 +- docs/reference/dfglm.html | 2 +- docs/reference/disk.frame.html | 4 +- docs/reference/dplyr_verbs.html | 2 +- docs/reference/evalparseglue.html | 2 +- docs/reference/foverlaps.disk.frame.html | 2 +- docs/reference/gen_datatable_synthetic.html | 2 +- docs/reference/get_chunk.html | 2 +- docs/reference/get_chunk_ids.html | 14 ++--- docs/reference/group_by.html | 2 +- docs/reference/groups.disk.frame.html | 2 +- docs/reference/hard_arrange.html | 2 +- docs/reference/hard_group_by.html | 2 +- docs/reference/head_tail.html | 2 +- docs/reference/index.html | 2 +- docs/reference/is_disk.frame.html | 2 +- docs/reference/join.html | 2 +- docs/reference/make_glm_streaming_fn.html | 2 +- docs/reference/map.html | 5 +- docs/reference/map2.html | 2 +- docs/reference/merge.disk.frame.html | 2 +- docs/reference/move_to.html | 2 +- docs/reference/nchunks.html | 2 +- docs/reference/ncol_nrow.html | 2 +- docs/reference/overwrite_check.html | 2 +- docs/reference/print.disk.frame.html | 2 +- docs/reference/rbindlist.disk.frame.html | 2 +- docs/reference/rechunk.html | 4 +- docs/reference/recommend_nchunks.html | 2 +- docs/reference/remove_chunk.html | 8 +-- docs/reference/sample.html | 2 +- docs/reference/setup_disk.frame.html | 2 +- docs/reference/shard.html | 2 +- docs/reference/shardkey.html | 2 +- docs/reference/shardkey_equal.html | 2 +- docs/reference/show_ceremony.html | 2 +- docs/reference/srckeep.html | 2 +- docs/reference/sub-.disk.frame.html | 2 +- docs/reference/tbl_vars.disk.frame.html | 2 +- docs/reference/write_disk.frame.html | 2 +- docs/reference/zip_to_disk.frame.html | 4 +- utils/build_utils.R | 1 - 62 files changed, 162 insertions(+), 87 deletions(-) diff --git a/R/dplyr_verbs.r b/R/dplyr_verbs.r index 94c0fe7c..e5d28c56 100644 --- a/R/dplyr_verbs.r +++ b/R/dplyr_verbs.r @@ -133,7 +133,70 @@ chunk_summarise <- create_chunk_mapper(dplyr::summarise) #' @rdname dplyr_verbs summarize.disk.frame <- function(...) { # comment summarize.grouped_disk.frame - stop("`summarize.disk.frame` has been removed. Please use `chunk_summarize` instead. This is in preparation for a more powerful `group_by` framework") + warning("`summarize.disk.frame`'s behaviour has changed. Please use `chunk_summarize` if you wish to `dplyr::summarize` to each chunk") + + stop("TODO: adapt this for no group-by") + code = substitute(list(...))[-1] + expr_id = 0 + temp_varn = 0 + #browser() + + list_of_chunk_agg_fns <- as.character(methods(class = "chunk_agg.disk.frame")) + list_of_collected_agg_fns <- as.character(methods(class = "collected_agg.disk.frame")) + + # generate the chunk_summarize_code + summarize_code = purrr::map_dfr(code, ~{ + expr_id <<- expr_id + 1 + # parse the function into table form for easy interrogration + gpd = getParseData(parse(text = deparse(.x)), includeText = TRUE); + grp_funcs = gpd %>% filter(token == "SYMBOL_FUNCTION_CALL") %>% select(text) %>% pull + + # search in the space to find functions name `fn`.chunk_agg.disk.frame + # only allow one such functions for now TODO improve it + #stopifnot(sum(paste0(unique(grp_funcs), ".chunk_agg.disk.frame") %in% list_of_chunk_agg_fns) == 1) + #stopifnot(sum(paste0(unique(grp_funcs), ".collected_agg.disk.frame") %in% list_of_collected_agg_fns) == 1) + stopifnot(sum(sapply(unique(grp_funcs), function(x) exists(paste0(x, ".chunk_agg.disk.frame")))) == 1) + stopifnot(sum(sapply(unique(grp_funcs), function(x) exists(paste0(x, ".collected_agg.disk.frame")))) == 1) + + + + temp_varn <<- temp_varn + 1 + tmpcode = deparse(evalparseglue("substitute({deparse(.x)}, list({grp_funcs} = quote({grp_funcs}.chunk_agg.disk.frame)))")) %>% paste0(collapse = " ") + + chunk_code = data.frame(assign_to = as.character(glue::glue("tmp{temp_varn}")), expr = tmpcode, stringsAsFactors = FALSE) + + chunk_code$orig_code = deparse(.x) + chunk_code$expr_id = expr_id + chunk_code$grp_fn = grp_funcs + chunk_code$name = ifelse(is.null(names(code[expr_id])), "", names(code[expr_id])) + + # create the aggregation code + chunk_code$agg_expr = glue::glue("{grp_funcs}.collected_agg.disk.frame({paste0(chunk_code$assign_to, collapse=', ')})") + + #print(sapply(chunk_code, typeof)) + chunk_code + }) + + chunk_summ_code = paste0(summarize_code$assign_to, "=list(", summarize_code$expr, ")") %>% paste0(collapse = ", ") + + agg_code_df = summarize_code %>% + select(expr_id, name, agg_expr, orig_code) %>% + unique %>% + transmute(agg_code = paste0(ifelse(name == "", paste0("`", orig_code, "` = "), paste0(name, "=")), agg_expr)) + + agg_summ_code = paste0(agg_code_df$agg_code, collapse = ",") + + # get the by variables + group_by_cols = purrr::map_chr(attr(.data, "group_by_cols"), ~{deparse(.x)}) + + list(group_by_cols = group_by_cols, chunk_summ_code = chunk_summ_code, agg_summ_code = agg_summ_code) + + # generate full code + code_to_run = glue::glue("chunk_group_by({group_by_cols}) %>% chunk_summarize({chunk_summ_code}) %>% collect %>% group_by({group_by_cols}) %>% summarize({agg_summ_code})") + + class(.data) <- c("summarized_disk.frame", "disk.frame") + attr(.data, "summarize_code") = code_to_run + .data } diff --git a/docs/404.html b/docs/404.html index e2b3eb75..88202fcc 100644 --- a/docs/404.html +++ b/docs/404.html @@ -90,7 +90,7 @@