Урок 6 Многопоточность в R
6.1 Описание
Давайте представим ситуацию, что вам необходимо доствить 8 адресатам посылки. Если вы будете доставлять их одним курьером, то ему придётся по очереди посетить все 8 адресов, собрать подписи в качестве подтверждения о получении посылки, и принести вам подписанные документы. но если у вас в распоряжении будет 4 курьера, то вы сможете распределить каждому курьеру всего по 2 адреса, и процесс доставки займёт в 4 раза меньше времени.
Ок, а при чём тут вообще курьеры спросите вы. Во всех предыдущих уроках мы выполняли итерирование по элементов объектов в последовательном режиме, т.е. использовали одного курьера. Это преемлемый способ итерирования, но не самый эффективный. В этом уроке мы с вами разберёмся с тем, как задействовать сразу 4ёх курьеров, т.е. выполнять итерации в параллеьном, многопоточном режиме.
Так же мы можем сделать этот процесс ещё более эффективным, если будем не рандомно раздавать курьерам адресатов, а например распредим каждому курьеру по одному району, это балансировка нагрузки, её мы тоже затронем в этом уроке.
6.3 Тайм коды
00:00 Вступление.
00:51 Что такое многопоточность.
02:20 Какие пакеты мы будем использовать в ходе урока.
03:25 Используем foreach
в последовательном режиме.
07:42 Аргументы конструкции foreach.
10:05 Управление объединением результатов итераций цикла foreach.
11:05 Выполнение foreach
в многопоточном режиме.
12:41 Схема реализации многопоточности.
13:52 Возвращение к последовательному выполнению и ID процесса.
14:56 Бекенды к foreach.
15:38 Оператор %dorng%
.
18:10 Параллельная реализация функций семейства apply.
20:52 Список функций пакетов parallel
и pbapply.
21:54 Пакет furrr
.
23:10 Соответствие функций пакета purrr
и furrr
.
23:50 Заключение.
6.4 Код
# многопоточные циклы -----------------------------------------------------
# install.packages("doSNOW")
# library(doSNOW)
# library(doParallel)
library(doFuture)
# функция длительного выполнения
pause <- function(min = 1, max = 3) {
ptime <- runif(1, min, max)
Sys.sleep(ptime)
out <- list(
pid = Sys.getpid(),
pause_sec = ptime
)
}
test <- pause()
# используем foreach
# итерируемся сразу по двум объектам
system.time (
{test2 <- foreach(min = 1:3, max = 2:4) %do% pause(min, max)}
)
# сумма длительностей пауз
sum(sapply(test2, '[[', i = 'pause_sec'))
# меняем функцию собирающую результаты каждой итерации
test3 <- foreach(min = 1:3, max = 2:4, .combine = dplyr::bind_rows) %do% pause(min, max)
# параллельный режим выполнения
# создаём кластер из четырёх ядер
#cl <- makeCluster(4)
#registerDoSNOW(cl)
options(future.rng.onMisuse = "ignore")
registerDoFuture()
plan('multisession', workers = 3)
# выполняем тот же код но в параллельном режиме
system.time (
{
par_test1 <-
foreach(min = 1:3, max = 2:4, .combine = dplyr::bind_rows) %dopar% {
pause(min, max)
}
}
)
# останавливаем кластер
plan('sequential')
par_test1
# многопоточный вариант функций apply -------------------------------------
library(pbapply)
library(parallel)
# создаём кластер из четырёх ядер
cl <- makeCluster(3)
# пример с pbapply
par_test2 <- pblapply(rep(1, 3), FUN = pause, max = 3, cl = cl)
# пример с parallel
par_test3 <- parLapply(rep(1, 3), fun = pause, max = 3, cl = cl)
# останавливаем кластер
stopCluster(cl)
# многопоточный purrr -----------------------------------------------------
library(furrr)
plan('multisession', workers = 3)
par_test4 <- future_map2(1:3, 2:4, pause)
# останавливаем кластер
plan('sequential')