When you want to use random number generators (RNG) for parallel computations, you need to make sure that the sequences of random numbers used by the different processes do not overlap. There are two main approaches to this problem:1
The RNGs included in dqrng
offer at least one of these
methods for parallel RNG usage. When using the R or C++ interface
independent streams can be accessed using the two argument
dqset.seed(seed, stream)
or
dqset_seed(seed, stream)
functions.
The Threefry engine uses internally a counter with 2256 possible states, which can be
split into different substreams. When used from R or C++ with the two
argument dqset.seed
or dqset_seed
this counter
space is split into 264
streams with 2192 possible
states each. This is equivalent to 264 streams with a period of 2194 each.
In the following example a matrix with random numbers is generated in parallel using the parallel package. The resulting correlation matrix should be close to the identity matrix if the different streams are independent:
library(parallel)
cl <- parallel::makeCluster(2)
res <- clusterApply(cl, 1:8, function(stream, seed, N) {
library(dqrng)
dqRNGkind("Threefry")
dqset.seed(seed, stream)
dqrnorm(N)
}, 42, 1e6)
stopCluster(cl)
res <- matrix(unlist(res), ncol = 8)
symnum(x = cor(res), cutpoints = c(0.001, 0.003, 0.999),
symbols = c(" ", "?", "!", "1"),
abbr.colnames = FALSE, corr = TRUE)
Correlation matrix:
[1,] 1
[2,] 1
[3,] ? 1
[4,] ? ? 1
[5,] ? ? 1
[6,] ? 1
[7,] ? 1
[8,] ? 1
attr(,"legend")
[1] 0 ‘ ’ 0.001 ‘?’ 0.003 ‘!’ 0.999 ‘1’ 1
As expected the correlation matrix for the different columns is almost equal to the identity matrix.
The Xoshiro256+/++/** generators has a period of 2256 − 1 and offers 2128 sub-sequences that are 2128 random draws apart as well as
264 streams that are 2192 random draws apart. The
Xoroshiro128+/++/** generators has a period of 2128 − 1 and offers 264 sub-sequences that are 264 random draws apart as well as
232 streams that are 298 random draws apart. You can go
from one sub-sequence to the next using the jump()
or
long_jump()
method and the convenience wrapper
jump(int n)
or long_jump(int n)
, which
advances to the n
th sub-sequence. When used from R or C++
with the two argument dqset.seed
and
dqset_seed
you get 264 streams that are 2192 and 264 random draws apart for
Xoshiro256+/++/** and Xoroshiro128+/++/**, respectively.
As an example using C++ we draw and sum a large number of uniformly
distributed numbers. This is done several times sequentially as well as
using OpenMP for parallelisation. Care has been taken to keep the global
RNG rng
usable outside of the parallel block.
#include <Rcpp.h>
// [[Rcpp::depends(dqrng, BH)]]
#include <dqrng_distribution.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>
// [[Rcpp::export]]
std::vector<double> random_sum(int n, int m) {
dqrng::uniform_distribution dist(0.0, 1.0); // Uniform distribution [0,1)
auto rng = dqrng::generator<dqrng::xoshiro256plusplus>(); // seeded from R's RNG
std::vector<double> res(m);
for (int i = 0; i < m; ++i) {
double lres(0);
for (int j = 0; j < n; ++j) {
lres += dist(*rng);
}
res[i] = lres / n;
}
return res;
}
// [[Rcpp::export]]
std::vector<double> parallel_random_sum(int n, int m, int ncores) {
dqrng::uniform_distribution dist(0.0, 1.0); // Uniform distribution [0,1)
auto rng = dqrng::generator<dqrng::xoshiro256plusplus>(); // seeded from R's RNG
std::vector<double> res(m);
// ok to use rng here
#pragma omp parallel num_threads(ncores)
{
// make thread local copy of rng and advance it by 1 ... ncores jumps
auto lrng = rng->clone(omp_get_thread_num() + 1);
#pragma omp for
for (int i = 0; i < m; ++i) {
double lres(0);
for (int j = 0; j < n; ++j) {
lres += dist(*lrng);
}
res[i] = lres / n;
}
}
// ok to use rng here
return res;
}
/*** R
bm <- bench::mark(
parallel_random_sum(1e7, 8, 4),
random_sum(1e7, 8),
check = FALSE
)
bm[,1:4]
*/
Result:
expression min median `itr/sec`
<bch:expr> <bch:tm> <bch:tm> <dbl>
1 parallel_random_sum(1e+07, 8, 4) 98.3ms 99ms 10.1
2 random_sum(1e+07, 8) 270.2ms 271ms 3.68
Note that the thread local RNG uses std::unique_ptr
instead of Rcpp::Xptr
since it is used in parallel
code.
From the PCG family we will look at pcg64, a 64-bit generator with a
period of 2128. It offers
the function advance(int n)
,
which is equivalent to n
random draws but scales as O(ln(n))
instead of O(n). In
addition, it offers 2127
separate streams that can be enabled via the function set_stream(int n)
or the two
argument constructor with seed
and stream
.
When used from R or C++ with the two argument dqset.seed
and dqset_seed
you get 264 streams out of the possible
2127 separate streams.
In the following example a matrix with random numbers is generated in
parallel using RcppParallel. Instead of using the more traditional
approach of generating the random numbers from a certain distribution,
we are using the fast sampling methods from dqrng_sample.h
.
The resulting correlation matrix should be close to the identity matrix
if the different streams are independent:
#include <Rcpp.h>
// [[Rcpp::depends(dqrng, BH)]]
#include <dqrng_generator.h>
#include <dqrng_sample.h>
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
struct RandomFill : public RcppParallel::Worker {
RcppParallel::RMatrix<int> output;
uint64_t seed;
RandomFill(Rcpp::IntegerMatrix output, const uint64_t seed) : output(output), seed(seed) {};
void operator()(std::size_t begin, std::size_t end) {
auto rng = dqrng::generator<pcg64>(seed, end);
for (std::size_t col = begin; col < end; ++col) {
auto sampled = dqrng::sample::sample<std::vector<int>, uint32_t>(*rng, 100000, output.nrow(), true);
RcppParallel::RMatrix<int>::Column column = output.column(col);
std::copy(sampled.begin(), sampled.end(), column.begin());
}
}
};
// [[Rcpp::export]]
Rcpp::IntegerMatrix parallel_random_matrix(const int n, const int m, const int ncores) {
Rcpp::IntegerMatrix res(n, m);
RandomFill randomFill(res, 42);
RcppParallel::parallelFor(0, m, randomFill, m/ncores + 1);
return res;
}
/*** R
res <- parallel_random_matrix(1e6, 8, 4)
head(res)
symnum(x = cor(res), cutpoints = c(0.001, 0.003, 0.999),
symbols = c(" ", "?", "!", "1"),
abbr.colnames = FALSE, corr = TRUE)
*/
Head of the random matrix:
[,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
[1,] 67984 16279 69262 7126 21441 37720 51107 51045
[2,] 69310 21713 82885 81157 54051 5261 91165 17833
[3,] 76742 31232 78953 4626 94939 29416 85652 78296
[4,] 76349 47427 1770 37957 33888 59134 94591 65793
[5,] 85008 89224 43493 7925 60866 2464 14080 10763
[6,] 38017 88509 51195 73086 1883 68193 75259 62216
Correlation matrix:
[1,] 1
[2,] 1
[3,] ? 1
[4,] ? 1
[5,] 1
[6,] ? ? ? 1
[7,] ? 1
[8,] ? 1
attr(,"legend")
[1] 0 ‘ ’ 0.001 ‘?’ 0.003 ‘!’ 0.999 ‘1’ 1
So as expected the correlation matrix is almost equal to the identity matrix.
So far we have used our own RNG and either seeded it from R’s RNG or
with an explicit seed. As an alternative, we can also make use of
dqrng’s global RNG. This is exemplified in the template function
parallel_generate<>()
defined in the header file
dqrng_extra/parallel_generate.h
. A simple way to use this
template would be the following functions which generate random variates
according to the uniform, normal, or exponential distribution:
#include <Rcpp.h>
// [[Rcpp::depends(dqrng, BH, RcppParallel)]]
// [[Rcpp::plugins(openmp)]]
#include <dqrng_extra/parallel_generate.h>
#include <dqrng_distribution.h>
using dqrng::extra::parallel_generate;
// [[Rcpp::export]]
Rcpp::NumericVector runif_para(std::size_t n, double min = 0, double max = 1,
std::size_t threads = 2, std::size_t streams = 24) {
return parallel_generate<dqrng::uniform_distribution>(n, threads, streams, min, max);
}
// [[Rcpp::export]]
Rcpp::NumericVector rnorm_para(std::size_t n, double mean = 0, double sd = 1,
std::size_t threads = 2, std::size_t streams = 24) {
return parallel_generate<dqrng::normal_distribution>(n, threads, streams, mean, sd);
}
// [[Rcpp::export]]
Rcpp::NumericVector rexp_para(std::size_t n, double rate = 1,
std::size_t threads = 2, std::size_t streams = 24) {
return parallel_generate<dqrng::exponential_distribution>(n, threads, streams, rate);
}
Generating n
numbers is split into (about) equal
streams
streams that are processed by threads
threads. For an efficient distribution of the workload it is important
that streams
is a multiple of threads
, since
then every thread gets to process the same number of streams. The
default streams = 24
can be used together with 1, 2, 3, 4,
6, 8, 12, and 24 threads. The result for a given number of
streams is independent of the number of threads:
dqset.seed(42); norm1 <- rnorm_para(22, threads = 1)
dqset.seed(42); norm2 <- rnorm_para(22, threads = 4)
identical(norm1, norm2)
#> [1] TRUE
At the same time, the serial version is almost as fast as using the
normal dqrng::dqr<dist>
function and therefore much
faster than the stats::r<dist>
function. Using
multiple threads gives a clear speed up.
n <- 1e6
bench::mark(stats::runif(n),
dqrng::dqrunif(n),
runif_para(n, threads = 2L),
runif_para(n, threads = 1L),
check = FALSE)[, 1:6]
bench::mark(stats::rnorm(n),
dqrng::dqrnorm(n),
rnorm_para(n, threads = 2L),
rnorm_para(n, threads = 1L),
check = FALSE)[, 1:6]
bench::mark(stats::rexp(n),
dqrng::dqrexp(n),
rexp_para(n, threads = 2L),
rexp_para(n, threads = 1L),
check = FALSE)[, 1:6]
Here the actual implementation of the template function:
// Copyright 2024 Ralf Stubner
// Copyright 2024 Philippe Grosjean
//
// This file is part of dqrng.
//
// dqrng is free software: you can redistribute it and/or modify it
// under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// dqrng is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with dqrng. If not, see <http://www.gnu.org/licenses/>.
#include <sstream>
#include <dqrng.h>
#include <RcppParallel/RVector.h>
#include <omp.h>
namespace dqrng {
namespace extra {
template<typename Dist, typename... Params>
Rcpp::NumericVector parallel_generate(std::size_t n,
std::size_t threads,
std::size_t streams,
Params&&... params) {
if (n < streams)
streams = n;
std::size_t stream_size = n / streams;
std::size_t remainder = n % streams;
// use RcppParallel::RVector as thread safe accessor
Rcpp::NumericVector res(Rcpp::no_init(n));
RcppParallel::RVector<double> work(res);
// use global RNG from dqrng
dqrng::random_64bit_accessor rng{};
std::stringstream buffer;
#ifdef _OPENMP
std::size_t maxthreads = omp_get_num_procs();
if (threads > maxthreads)
threads = maxthreads;
// No need for more threads than there are streams
if (threads > streams)
threads = streams;
#endif
#pragma omp parallel num_threads(threads)
{
std::size_t start,end;
#pragma omp for schedule(static,1)
for (std::size_t i = 0; i < streams; ++i) {
if (i < remainder) {
start = i * stream_size + i;
end = start + stream_size + 1;
} else {
start = i * stream_size + remainder;
end = start + stream_size;
}
// private RNG in each stream; RNG with i == 0 is identical to global RNG
auto prng = rng.clone(i);
prng->generate<Dist>(std::begin(work) + start, std::begin(work) + end,
std::forward<Params>(params)...);
if (i == 0) {// Save the state of the global RNG's clone
buffer << *prng;
}
}
}
// Make sure that the global RNG advances as well by applying the state
// of the global RNG's clone to the global RNG
buffer >> rng;
return res;
}
} // namespace extra
} // namespace dqrng
Besides the distribution of the work into (about) equal streams, the
pattern for RNG access is similar to the OpenMP example above with the
important difference that the local RNGs are not thread- but
stream-specific. This way, the result becomes independent of
the used amount of parallelism. However, one has to consider one
important aspect: After the parallel for loop, the global RNG has not
advanced at all. Calling the function repeatedly would lead to identical
results. To circumvent this, one of the stream specific RNGs is an exact
clone of the global RNG (clone(stream=0)
) and the state of
this RNG after processing its stream is saved and used to overwrite the
global RNG’s state.2 This way, the global RNG’s state advances
as if it had processed one of the streams and successive calls to
parallel_generate()
produce different random numbers as
expected.
dqset.seed(153)
runif_para(30)
#> [1] 0.87693642 0.14323366 0.33129746 0.07856319 0.80991119 0.37524485
#> [7] 0.90387542 0.38746776 0.30473153 0.01102334 0.21272306 0.11975609
#> [13] 0.98440547 0.13373340 0.82823735 0.87196225 0.14920422 0.27723804
#> [19] 0.59308120 0.07853078 0.63040483 0.21707435 0.25876379 0.81296194
#> [25] 0.53645030 0.29976254 0.37159454 0.38683266 0.03737063 0.03359113
runif_para(30) # Different values, as expected
#> [1] 0.90407135 0.73543499 0.09026296 0.90321975 0.66162669 0.51716146
#> [7] 0.74186366 0.41125413 0.17581202 0.68547734 0.11766549 0.82316789
#> [13] 0.40565668 0.44854610 0.95477820 0.64388593 0.31991691 0.02239872
#> [19] 0.13687388 0.32476719 0.67093851 0.05564081 0.76817620 0.49502455
#> [25] 0.07459706 0.25493312 0.14019729 0.89704659 0.40548199 0.53800443
Pitfall: should you use parallel_generate()
in
concurrent threads or streams, make sure to seed each of them with
enough separate streams to avoid overlap. For instance, with
parallel_generate(..., stream = 4)
, you could seed first
thread with something like dqset.seed(546, 1)
, but you must
seed second thread at least on stream 5 (previous stream plus the number
of streams you reserve for parallel_generate()
). If you use
dqset.seed(546, 2)
on the second thread, there will be an
overlap like this:
# Seed used in the first thread
dqset.seed(546, 1); (v1 <- rnorm_para(8, streams = 4))
#> [1] 0.01904358 0.57750157 0.39156879 -1.72594164 1.24949453 -0.87535133
#> [7] -0.49878776 0.26077249
# Seed used in the second thread
dqset.seed(546, 2); (v2 <- rnorm_para(8, streams = 4))
#> [1] 0.3915688 -1.7259416 1.2494945 -0.8753513 -0.4987878 0.2607725
#> [7] 1.2018189 -0.1060487
Note how values 1-6 of v2 are identical to values 3-8 of v1 because
of an overlap of the streams consumed in each call to
parallel_generate()
. You will get the same result if the
two lines of code were run in separate threads or even, separate R
processes. Here, if second dqset.seed()
is shifted by 4
streams or more from first one, then there is no overlap any more:
# Seed used in the first thread
dqset.seed(546, 1); (v1 <- rnorm_para(8, streams = 4))
#> [1] 0.01904358 0.57750157 0.39156879 -1.72594164 1.24949453 -0.87535133
#> [7] -0.49878776 0.26077249
# Seed used in the second thread
dqset.seed(546, 5); (v2 <- rnorm_para(8, streams = 4))
#> [1] 1.2018189 -0.1060487 -0.8532641 0.6531933 -0.8304053 -0.4745548
#> [7] -0.4211618 -0.5871540
See for example https://www.pcg-random.org/posts/critiquing-pcg-streams.html.↩︎
Note that here in contrast to the
default PCG implementation, streams are counted from the current stream,
i.e. setting stream
to 0
will give the same
RNG. This brings PCG in line with the other RNGs.↩︎