Урок 7 Пакет future

7.1 Описание

В заключительном уроке этого курса мы познакомимся с наиболее продвинутым интерйесом параллельного программирования на языке R, который предоставляет пакет future.

7.2 Видео

7.3 Тайм коды

00:00 Вступление.
01:15 Явное и неявное объявление фьючерсов.
04:33 Аргументы функции ё.
05:40 Локальное окружение фьючерса.
06:42 Стратегии выполнения вычислений в пакете future.
08:20 Как менять стратегию выполнения кода с помощью future.
10:42 Настройка плана cluster.
12:09 Вложенные друг в друга фьючерсы.
18:00 Отладка ошибок в фьючерсах.
19:03 Многопоточное итерирование с помощью future.
21:58 Пример использования future в многопоточном режиме.
26:07 Опции и переменные среды пакета future.
28:00 Другие пакеты входящие в библиотеку futureverse.
29:10 Заключение.

7.4 Код

library(future)
library(dplyr)

# явное и неявное объявление фьючерсов ------------------------------------
# обычное выражение
v <- {
  cat("Hello world!\n")
  3.14
}

# неявное объявление фьчерса
v %<-% {
  cat("Hello world!\n")
  3.14
}

# явное объявления фьючерса
f <- future({
  cat("Hello world!\n")
  3.14
})
v <- value(f)
resolved(f)
# фьючерс выполняет все вычисления в собственном окружении -----------------
a <- 1

x %<-% {
  a <- 2
  2 * a
}

x

a

# изменяем план выполнения фьючерса ---------------------------------------
## последовательное выполнение
plan(sequential)
pid <- Sys.getpid()
pid

a %<-% {
  pid <- Sys.getpid()
  cat("Future 'a' ...\n")
  3.14
  }
b %<-% {
  cat("Future 'b' ...\n")
  Sys.getpid()
  }
c %<-% {
  cat("Future 'c' ...\n")
  2 * a
  }

b
c
a
pid

## ассинхронное выполнение
### режим параллельно запущенных сеансов R
plan(multisession)
pid <- Sys.getpid()
pid

a %<-% {
  pid <- Sys.getpid()
  cat("Future 'a' ...\n")
  cat('pid: ', pid)
  3.14
  }
b %<-% {
  cat("Future 'b' ...\n")
  Sys.getpid()
  }
c %<-% {
  cat("Future 'c' ...\n")
  2 * a
}

b

c

a

pid

plan(sequential)

# просмотрт доступного количества потоков
availableCores()

### кластерное развёртывание
library(parallel)
cl <- parallel::makeCluster(3)
plan(cluster, workers = cl)

pid <- Sys.getpid()
pid

a %<-% {
  pid <- Sys.getpid()
  cat("Future 'a' ...\n")
  cat('pid: ', pid)
  3.14
}
b %<-% {
  cat("Future 'b' ...\n")
  Sys.getpid()
}
c %<-% {
  cat("Future 'c' ...\n")
  2 * a
}

b

c

a

pid

parallel::stopCluster(cl)


# вложенные фьчерсы -------------------------------------------------------
plan(list(multisession, sequential))
# plan(list(sequential, multisession))

# указываем количество ядер для каждого процесса
# plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 2)))
pid <- Sys.getpid()
a %<-% {
  cat("Future 'a' ...\n")
  Sys.getpid()
  }
b %<-% {
  cat("Future 'b' ...\n")
  b1 %<-% {
    cat("Future 'b1' ...\n")
    Sys.getpid()
    }
  b2 %<-% {
    cat("Future 'b2' ...\n")
    Sys.getpid()
    }
  c(b.pid = Sys.getpid(), b1.pid = b1, b2.pid = b2)
  }

pid

a
b

plan(sequential)


# обработка ошибок в фьючерсах --------------------------------------------
plan(sequential)
b <- "hello"
a %<-% {
  cat("Future 'a' ...\n")
  log(b)
  } %lazy% TRUE

a

# смотрим где именно была ошибка
backtrace(a)

# используем итерирование в параллельныъ процессах ------------------------
# тестовая функция
manual_pause <- function(x) {
  Sys.sleep(x)
  out <- list(pid = Sys.getpid(), pause = x)
  return(out)
} 

# паузы
pauses <- c(0.5, 2, 3, 2.5) 

# тест
manual_pause(2)

# активируем параллельные вычисления
plan("multisession", workers = 4)
# итерируемся
futs <- lapply(pauses, function(i) future({ manual_pause(i) }))
# проверяем статус выполнения фьючерсов
sapply(futs, resolved) 
# собираем результаты
res <- lapply(futs, value)    

dplyr::bind_rows(res)


# используем future совместно с promises ----------------------------------
library(cli)
options(cli.progress_show_after = 0, 
        cli.spinner = "dots")

# паузы
pauses.1 <- sample(1:5, 4, replace = T)
pauses.2 <- sample(2:3, 4, replace = T)
pauses.3 <- sample(3:6, 4, replace = T)

# первое длительное вычисление
plan(list(
  tweak(multisession, workers = 3), 
  tweak(multisession, workers = 4)
  )
)

val1 <- future(
  {
    futs <- lapply(pauses.1, function(i) future({ manual_pause(i) }))
    res  <- lapply(futs, value) 
    res  <- dplyr::bind_rows(res)
  }
) 

val2 <- future(
  {
    futs <- lapply(pauses.2, function(i) future({ manual_pause(i) }))
    res  <- lapply(futs, value) 
    res  <- dplyr::bind_rows(res)
  }
) 

val3 <- future(
  {
    futs <- lapply(pauses.3, function(i) future({ manual_pause(i) }))
    res  <- lapply(futs, value) 
    res  <- dplyr::bind_rows(res)
  }
) 

# ждём выполнения всех фьючерсов
cli_progress_bar("Waiting")
while ( ! (resolved(val1) | resolved(val2) | resolved(val3)) ) {
  cli_progress_update()
}

cli_progress_update(force = TRUE)

# result table
lapply(list(val1, val2, val3), value) %>% 
  bind_rows() %>%  
  mutate(main_pid = Sys.getpid()) %>% 
  print() %>%
  pull(pause) %>% 
  sum()  %>% 
  cat("\n", "Sum of all pauses: ", ., "\n")

plan(sequential)

7.5 Презентация

7.6 Тест