-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathparallel.qmd
More file actions
238 lines (168 loc) · 9.5 KB
/
parallel.qmd
File metadata and controls
238 lines (168 loc) · 9.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# Parallelisation
```{r}
#| include: false
source("common.R")
```
Both `dust2` and `monty` have built-in support for parallelisation, making it straightforward to take advantage of having multiple cores available. There are three main ways to parallelise, depending on what part of the computation you want to speed up:
1. **Using workers to parallelise between chains**
This runs each MCMC chain in its own process. It is useful whenever you are running multiple chains, regardless of whether your model is deterministic or stochastic, or whether it is written in `odin2` or not.
2. **Using threads to parallelise between particles in a filter**
This runs different set of particles from a particle filter in parallel, within a single process. It is useful when you have a stochastic model written in `odin2` and are using the filter to estimate the marginal likelihood. Using threads can be combined with using workers.
3. **Manual scheduling of chains**
This is a fully manual way of distributing chains - for example, across the nodes of an HPC system - rather than letting `monty` manage the workers automatically.
We illustrate how to implement all three methods below. Note that all the three parallelisation methods are implemented so that results will be identical to running in serial.
## Using workers to parallelise between chains
```{r}
library(monty)
```
Let's revisit the simple Gaussian mixture model example from @sec-monty-simple-example, and sample 4 chains of length 1000.
```{r}
fn <- function(l, p, m1, m2) {
log(p * dnorm(l, mean = m1) + (1 - p) * dnorm(l, mean = m2))
}
mixture_model <- monty_model_function(fn,
fixed = list(p = 0.75, m1 = 3, m2 = 7),
allow_multiple_parameters = TRUE)
sampler <- monty_sampler_random_walk(matrix(2))
set.seed(1)
samples <- monty_sample(mixture_model, sampler, 1000, initial = 3, n_chains = 4)
```
One of the available inputs to `monty_sample` that we have not specified is `runner`. The runner determines how the chains are run, and these can be setup with functions in `monty`. If `runner` is not specified then as a default the samples will be run with the simplest runner which is constructed with `monty_runner_serial`. With this runner, chains are run in series (one after another).
However if we have multiple cores available, we may want to run chains in parallel using workers. In this case we can construct a runner with `monty_runner_callr`. This runner uses the `callr` package to distribute your chains over a number of worker processes on the same machine.
Suppose we have 2 cores, then we will setup the runner setting `n_workers = 2`.
```{r}
runner <- monty_runner_callr(n_workers = 2)
```
This indicates that we can run 2 chains in parallel. Running 4 chains in our example will mean that each worker process will run 2 chains in sequence. Ideally if you have sufficiently many cores available, you will set `n_workers` equal to the number of chains. Otherwise typically you want `n_workers` to be an even divisor of the number of chains - for 4 chains using 3 workers is not likely to be much faster than using 2 workers, as one of the workers would still need to run 2 chains in sequence.
We can run chains in parallel by using our runner
```{r}
set.seed(1)
samples2 <- monty_sample(mixture_model, sampler, 1000, initial = 3, n_chains = 4,
runner = runner)
```
We have set the seed before running in serial and before running in parallel so we can compare the output, and we can see that they produce identical results.
```{r}
identical(samples, samples2)
```
Notice that it has actually taken longer to run in parallel! This is because there is a cost to forking across the cores that outweighs the benefit of running in parallel in this case. If we run longer chains,
in serial
```{r}
set.seed(1)
samples <- monty_sample(mixture_model, sampler, 100000, initial = 3, n_chains = 4)
```
and then in parallel
```{r}
runner <- monty_runner_callr(2)
set.seed(1)
samples2 <- monty_sample(mixture_model, sampler, 100000, initial = 3, n_chains = 4,
runner = runner)
```
we now see that we are running quicker in parallel! In general, the longer each chain takes to run and the fewer chains you need to run per worker, the greater the benefit of running in parallel.
## Using threads to parallelise among particles in the filter
```{r}
library(odin2)
library(dust2)
```
Let's consider the following discrete-time, stochastic SIR model
```{r}
sir <- odin({
update(S) <- S - n_SI
update(I) <- I + n_SI - n_IR
update(R) <- R + n_IR
update(incidence) <- incidence + n_SI
p_SI <- 1 - exp(-beta * I / N * dt)
p_IR <- 1 - exp(-gamma * dt)
n_SI <- Binomial(S, p_SI)
n_IR <- Binomial(I, p_IR)
initial(S) <- N - I0
initial(I) <- I0
initial(R) <- 0
initial(incidence, zero_every = 1) <- 0
N <- parameter(1000)
I0 <- parameter(10)
beta <- parameter(0.2)
gamma <- parameter(0.1)
cases <- data()
cases ~ Poisson(incidence)
})
```
Which we will fit to the (synthetic) data set [`data/incidence.csv`](data/incidence.csv).
```{r}
d <- read.csv("data/incidence.csv")
head(d)
```
We will setup our packer, prior and sampler
```{r}
packer <- monty::monty_packer(scalar = c("beta", "gamma"),
fixed = list(I0 = 10, N = 1000))
prior <- monty::monty_dsl({
beta ~ Exponential(mean = 0.3)
gamma ~ Exponential(mean = 0.1)
})
vcv <- diag(2) * 0.01
sampler <- monty::monty_sampler_random_walk(vcv)
```
and then we will setup our likelihood using a filter with 100 particles and then run 4 chains without any parallelisation
```{r}
filter <- dust_filter_create(sir, time_start = 0,
data = d, dt = 0.25,
n_particles = 200)
likelihood <- dust_likelihood_monty(filter, packer,
save_trajectories = TRUE)
posterior <- likelihood + prior
samples <- monty_sample(posterior, sampler, n_steps = 1000,
initial = c(0.3, 0.1),
n_chains = 4)
```
An input to `dust_filter_create` that we have not specified is `n_threads`, which has a default value of 1. Threads can be used in the filter to run particles forward in parallel. Suppose we have two available cores and we want to use these for threads in the filter, then we would set `n_threads = 2`
```{r}
filter <- dust_filter_create(sir, time_start = 0,
data = d, dt = 0.25,
n_particles = 200,
n_threads = 2)
likelihood <- dust_likelihood_monty(filter, packer,
save_trajectories = TRUE)
posterior <- likelihood + prior
samples <- monty_sample(posterior, sampler, n_steps = 1000,
initial = c(0.3, 0.1),
n_chains = 4)
```
and we can see this is quicker than without parallelisation. As with workers and chains, generally the number of threads should be an even divisor of the number of particles.
You can use workers in combination with threads. To illustrate how to prioritise workers vs threads, let's instead use our 2 cores for workers.
```{r}
filter <- dust_filter_create(sir, time_start = 0,
data = d, dt = 0.25,
n_particles = 200,
n_threads = 1)
likelihood <- dust_likelihood_monty(filter, packer,
save_trajectories = TRUE)
posterior <- likelihood + prior
runner <- monty_runner_callr(n_workers = 2)
samples <- monty_sample(posterior, sampler, n_steps = 1000,
initial = c(0.3, 0.1),
n_chains = 4,
runner = runner)
```
We see that this was quicker - this is because there is no communication between workers while the chains run, but there is regular communication between threads during while the filter runs. Thus it is better to prioritise workers over threads. Suppose you have 32 cores available and are running 4 chains. Ideally you would set `n_workers = 4` in `monty_runner_callr` to ensure all 4 chains run at the same time, and then set `n_threads = 8` (`n_threads` represents the number of threads per worker) to use all 32 cores, and choose `n_particles` to be a multiple of 8.
## Using manual scheduling to parallelise between chains
Suppose you have access to an HPC system with multiple nodes of e.g. 32 cores. Efficient combination of workers and threads on a single 32-core node is already beneficial. Manual scheduling with `monty` enables running chains on different nodes, essentially increasing the number of cores available. For example, instead of running 4 chains with 4 workers on a 32-core node resulting in 8 threads per chain/worker, you could run 4 chains on 4 32-core nodes using 32 cores per chain (using 128 cores in total).
Instead of using `monty_sample`, first we would need to prepare the chains
```r
monty_sample_manual_prepare(posterior, sampler, n_steps = 1000,
path = "mypath",
initial = c(0.3, 0.1),
n_chains = 4)
```
where here `"mypath"` is the location for writing outputs to. Cluster jobs are submitted for each chain, running the chain with
```r
monty_sample_manual_run(1, path = "mypath")
```
for chain 1,
```r
monty_sample_manual_run(2, path = "mypath")
```
for chain 2, and so on.
Once all chains have been run, results from the chains can be collected together
```r
samples <- monty_sample_manual_collect("mypath")
```