--- title: "Parallel RNG usage" author: "Ralf Stubner" date: "`r Sys.Date()`" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Parallel RNG usage} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r setup, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) library(dqrng) evaluate <- FALSE ``` When you want to use random number generators (RNG) for parallel computations, you need to make sure that the sequences of random numbers used by the different processes do not overlap. There are two main approaches to this problem:^[See for example https://www.pcg-random.org/posts/critiquing-pcg-streams.html.] * Partition the complete sequence of random numbers produced for one seed into non-overlapping sequences and assign each process one sub-sequence. * Re-parametrize the generator to produce independent sequences for the same seed. The RNGs included in `dqrng` offer at least one of these methods for parallel RNG usage. When using the R or C++ interface independent streams can be accessed using the two argument `dqset.seed(seed, stream)` or `dqset_seed(seed, stream)` functions. # Threefry: usage from R The Threefry engine uses internally a counter with $2^{256}$ possible states, which can be split into different substreams. When used from R or C++ with the two argument `dqset.seed` or `dqset_seed` this counter space is split into $2^{64}$ streams with $2^{192}$ possible states each. This is equivalent to $2^{64}$ streams with a period of $2^{194}$ each. In the following example a matrix with random numbers is generated in parallel using the parallel package. The resulting correlation matrix should be close to the identity matrix if the different streams are independent: ```{r, eval=evaluate} library(parallel) cl <- parallel::makeCluster(2) res <- clusterApply(cl, 1:8, function(stream, seed, N) { library(dqrng) dqRNGkind("Threefry") dqset.seed(seed, stream) dqrnorm(N) }, 42, 1e6) stopCluster(cl) res <- matrix(unlist(res), ncol = 8) symnum(x = cor(res), cutpoints = c(0.001, 0.003, 0.999), symbols = c(" ", "?", "!", "1"), abbr.colnames = FALSE, corr = TRUE) ``` Correlation matrix: [1,] 1 [2,] 1 [3,] ? 1 [4,] ? ? 1 [5,] ? ? 1 [6,] ? 1 [7,] ? 1 [8,] ? 1 attr(,"legend") [1] 0 ‘ ’ 0.001 ‘?’ 0.003 ‘!’ 0.999 ‘1’ 1 As expected the correlation matrix for the different columns is almost equal to the identity matrix. # Xo(ro)shiro: jump ahead with OpenMP The Xoshiro256+/++/** generators has a period of $2^{256} -1$ and offers $2^{128}$ sub-sequences that are $2^{128}$ random draws apart as well as $2^{64}$ streams that are $2^{192}$ random draws apart. The Xoroshiro128+/++/** generators has a period of $2^{128} -1$ and offers $2^{64}$ sub-sequences that are $2^{64}$ random draws apart as well as $2^{32}$ streams that are $2^{98}$ random draws apart. You can go from one sub-sequence to the next using the `jump()` or `long_jump()` method and the convenience wrapper `jump(int n)` or `long_jump(int n)`, which advances to the `n`th sub-sequence. When used from R or C++ with the two argument `dqset.seed` and `dqset_seed` you get $2^{64}$ streams that are $2^{192}$ and $2^{64}$ random draws apart for Xoshiro256+/++/** and Xoroshiro128+/++/**, respectively. As an example using C++ we draw and sum a large number of uniformly distributed numbers. This is done several times sequentially as well as using OpenMP for parallelisation. Care has been taken to keep the global RNG `rng` usable outside of the parallel block. ```{r, eval=evaluate, engine='Rcpp'} #include // [[Rcpp::depends(dqrng, BH)]] #include // [[Rcpp::plugins(openmp)]] #include // [[Rcpp::export]] std::vector random_sum(int n, int m) { dqrng::uniform_distribution dist(0.0, 1.0); // Uniform distribution [0,1) auto rng = dqrng::generator(); // seeded from R's RNG std::vector res(m); for (int i = 0; i < m; ++i) { double lres(0); for (int j = 0; j < n; ++j) { lres += dist(*rng); } res[i] = lres / n; } return res; } // [[Rcpp::export]] std::vector parallel_random_sum(int n, int m, int ncores) { dqrng::uniform_distribution dist(0.0, 1.0); // Uniform distribution [0,1) auto rng = dqrng::generator(); // seeded from R's RNG std::vector res(m); // ok to use rng here #pragma omp parallel num_threads(ncores) { // make thread local copy of rng and advance it by 1 ... ncores jumps auto lrng = rng->clone(omp_get_thread_num() + 1); #pragma omp for for (int i = 0; i < m; ++i) { double lres(0); for (int j = 0; j < n; ++j) { lres += dist(*lrng); } res[i] = lres / n; } } // ok to use rng here return res; } /*** R bm <- bench::mark( parallel_random_sum(1e7, 8, 4), random_sum(1e7, 8), check = FALSE ) bm[,1:4] */ ``` Result: expression min median `itr/sec` 1 parallel_random_sum(1e+07, 8, 4) 98.3ms 99ms 10.1 2 random_sum(1e+07, 8) 270.2ms 271ms 3.68 Note that the thread local RNG uses `std::unique_ptr` instead of `Rcpp::Xptr` since it is used in parallel code. # PCG: multiple streams with RcppParallel From the PCG family we will look at pcg64, a 64-bit generator with a period of $2^{128}$. It offers the function [`advance(int n)`](https://www.pcg-random.org/using-pcg-cpp.html#void-advance-state-type-delta), which is equivalent to `n` random draws but scales as $O(ln(n))$ instead of $O(n)$. In addition, it offers $2^{127}$ separate streams that can be enabled via the function [`set_stream(int n)`](https://www.pcg-random.org/using-pcg-cpp.html#void-pcg32-set-stream-state-type-stream) or the [two argument constructor](https://www.pcg-random.org/using-pcg-cpp.html#pcg32-pcg32-constructor) with `seed` and `stream`. When used from R or C++ with the two argument `dqset.seed` and `dqset_seed` you get $2^{64}$ streams out of the possible $2^{127}$ separate streams. In the following example a matrix with random numbers is generated in parallel using RcppParallel. Instead of using the more traditional approach of generating the random numbers from a certain distribution, we are using the fast sampling methods from `dqrng_sample.h`. The resulting correlation matrix should be close to the identity matrix if the different streams are independent: ```{r, eval=evaluate, engine='Rcpp'} #include // [[Rcpp::depends(dqrng, BH)]] #include #include // [[Rcpp::depends(RcppParallel)]] #include struct RandomFill : public RcppParallel::Worker { RcppParallel::RMatrix output; uint64_t seed; RandomFill(Rcpp::IntegerMatrix output, const uint64_t seed) : output(output), seed(seed) {}; void operator()(std::size_t begin, std::size_t end) { auto rng = dqrng::generator(seed, end); for (std::size_t col = begin; col < end; ++col) { auto sampled = dqrng::sample::sample, uint32_t>(*rng, 100000, output.nrow(), true); RcppParallel::RMatrix::Column column = output.column(col); std::copy(sampled.begin(), sampled.end(), column.begin()); } } }; // [[Rcpp::export]] Rcpp::IntegerMatrix parallel_random_matrix(const int n, const int m, const int ncores) { Rcpp::IntegerMatrix res(n, m); RandomFill randomFill(res, 42); RcppParallel::parallelFor(0, m, randomFill, m/ncores + 1); return res; } /*** R res <- parallel_random_matrix(1e6, 8, 4) head(res) symnum(x = cor(res), cutpoints = c(0.001, 0.003, 0.999), symbols = c(" ", "?", "!", "1"), abbr.colnames = FALSE, corr = TRUE) */ ``` Head of the random matrix: [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [1,] 67984 16279 69262 7126 21441 37720 51107 51045 [2,] 69310 21713 82885 81157 54051 5261 91165 17833 [3,] 76742 31232 78953 4626 94939 29416 85652 78296 [4,] 76349 47427 1770 37957 33888 59134 94591 65793 [5,] 85008 89224 43493 7925 60866 2464 14080 10763 [6,] 38017 88509 51195 73086 1883 68193 75259 62216 Correlation matrix: [1,] 1 [2,] 1 [3,] ? 1 [4,] ? 1 [5,] 1 [6,] ? ? ? 1 [7,] ? 1 [8,] ? 1 attr(,"legend") [1] 0 ‘ ’ 0.001 ‘?’ 0.003 ‘!’ 0.999 ‘1’ 1 So as expected the correlation matrix is almost equal to the identity matrix. # Using the global RNG So far we have used our own RNG and either seeded it from R's RNG or with an explicit seed. As an alternative, we can also make use of dqrng's global RNG. This is exemplified in the template function `parallel_generate<>()` defined in the header file `dqrng_extra/parallel_generate.h`. A simple way to use this template would be the following functions which generate random variates according to the uniform, normal, or exponential distribution: ```{r, eval=evaluate, engine='Rcpp'} #include // [[Rcpp::depends(dqrng, BH, RcppParallel)]] // [[Rcpp::plugins(openmp)]] #include #include using dqrng::extra::parallel_generate; // [[Rcpp::export]] Rcpp::NumericVector runif_para(std::size_t n, double min = 0, double max = 1, std::size_t threads = 2, std::size_t streams = 24) { return parallel_generate(n, threads, streams, min, max); } // [[Rcpp::export]] Rcpp::NumericVector rnorm_para(std::size_t n, double mean = 0, double sd = 1, std::size_t threads = 2, std::size_t streams = 24) { return parallel_generate(n, threads, streams, mean, sd); } // [[Rcpp::export]] Rcpp::NumericVector rexp_para(std::size_t n, double rate = 1, std::size_t threads = 2, std::size_t streams = 24) { return parallel_generate(n, threads, streams, rate); } ``` Generating `n` numbers is split into (about) equal `streams` streams that are processed by `threads` threads. For an efficient distribution of the workload it is important that `streams` is a multiple of `threads`, since then every thread gets to process the same number of streams. The default `streams = 24` can be used together with 1, 2, 3, 4, 6, 8, 12, and 24 threads. The result for a *given number of streams* is *independent of the number of threads*: ```{r, eval=evaluate} dqset.seed(42); norm1 <- rnorm_para(22, threads = 1) dqset.seed(42); norm2 <- rnorm_para(22, threads = 4) identical(norm1, norm2) #> [1] TRUE ``` At the same time, the serial version is almost as fast as using the normal `dqrng::dqr` function and therefore much faster than the `stats::r` function. Using multiple threads gives a clear speed up. ```{r, eval=evaluate} n <- 1e6 bench::mark(stats::runif(n), dqrng::dqrunif(n), runif_para(n, threads = 2L), runif_para(n, threads = 1L), check = FALSE)[, 1:6] bench::mark(stats::rnorm(n), dqrng::dqrnorm(n), rnorm_para(n, threads = 2L), rnorm_para(n, threads = 1L), check = FALSE)[, 1:6] bench::mark(stats::rexp(n), dqrng::dqrexp(n), rexp_para(n, threads = 2L), rexp_para(n, threads = 1L), check = FALSE)[, 1:6] ``` Here the actual implementation of the template function: ```{r, eval=FALSE, engine='Rcpp', file="../inst/include/dqrng_extra/parallel_generate.h"} ``` Besides the distribution of the work into (about) equal streams, the pattern for RNG access is similar to the OpenMP example above with the important difference that the local RNGs are not *thread-* but *stream-specific*. This way, the result becomes independent of the used amount of parallelism. However, one has to consider one important aspect: After the parallel for loop, the global RNG has not advanced at all. Calling the function repeatedly would lead to identical results. To circumvent this, one of the stream specific RNGs is an exact clone of the global RNG (`clone(stream=0)`) and the state of this RNG after processing its stream is saved and used to overwrite the global RNG's state.^[Note that **here** in contrast to the default PCG implementation, streams are counted from the current stream, i.e. setting `stream` to `0` will give the same RNG. This brings PCG in line with the other RNGs.] This way, the global RNG's state advances as if it had processed one of the streams and successive calls to `parallel_generate()` produce different random numbers as expected. ```{r, eval=evaluate} dqset.seed(153) runif_para(30) #> [1] 0.87693642 0.14323366 0.33129746 0.07856319 0.80991119 0.37524485 #> [7] 0.90387542 0.38746776 0.30473153 0.01102334 0.21272306 0.11975609 #> [13] 0.98440547 0.13373340 0.82823735 0.87196225 0.14920422 0.27723804 #> [19] 0.59308120 0.07853078 0.63040483 0.21707435 0.25876379 0.81296194 #> [25] 0.53645030 0.29976254 0.37159454 0.38683266 0.03737063 0.03359113 runif_para(30) # Different values, as expected #> [1] 0.90407135 0.73543499 0.09026296 0.90321975 0.66162669 0.51716146 #> [7] 0.74186366 0.41125413 0.17581202 0.68547734 0.11766549 0.82316789 #> [13] 0.40565668 0.44854610 0.95477820 0.64388593 0.31991691 0.02239872 #> [19] 0.13687388 0.32476719 0.67093851 0.05564081 0.76817620 0.49502455 #> [25] 0.07459706 0.25493312 0.14019729 0.89704659 0.40548199 0.53800443 ``` Pitfall: should you use `parallel_generate()` in concurrent threads or streams, make sure to seed each of them with enough separate streams to avoid overlap. For instance, with `parallel_generate(..., stream = 4)`, you could seed first thread with something like `dqset.seed(546, 1)`, but you must seed second thread at least on stream 5 (previous stream plus the number of streams you reserve for `parallel_generate()`). If you use `dqset.seed(546, 2)` on the second thread, there will be an overlap like this: ```{r, eval=evaluate} # Seed used in the first thread dqset.seed(546, 1); (v1 <- rnorm_para(8, streams = 4)) #> [1] 0.01904358 0.57750157 0.39156879 -1.72594164 1.24949453 -0.87535133 #> [7] -0.49878776 0.26077249 # Seed used in the second thread dqset.seed(546, 2); (v2 <- rnorm_para(8, streams = 4)) #> [1] 0.3915688 -1.7259416 1.2494945 -0.8753513 -0.4987878 0.2607725 #> [7] 1.2018189 -0.1060487 ``` Note how values 1-6 of v2 are identical to values 3-8 of v1 because of an overlap of the streams consumed in each call to `parallel_generate()`. You will get the same result if the two lines of code were run in separate threads or even, separate R processes. Here, if second `dqset.seed()` is shifted by 4 streams or more from first one, then there is no overlap any more: ```{r, eval=evaluate} # Seed used in the first thread dqset.seed(546, 1); (v1 <- rnorm_para(8, streams = 4)) #> [1] 0.01904358 0.57750157 0.39156879 -1.72594164 1.24949453 -0.87535133 #> [7] -0.49878776 0.26077249 # Seed used in the second thread dqset.seed(546, 5); (v2 <- rnorm_para(8, streams = 4)) #> [1] 1.2018189 -0.1060487 -0.8532641 0.6531933 -0.8304053 -0.4745548 #> [7] -0.4211618 -0.5871540 ```