3 Paralelización con R en el Cluster (parallel)
3.1 Conexión Remota al Clúster (SSH )
3.1.1 Windows
- PuttyGen.exe (generación clave pública y privada)
- Putty.exe (conexión ssh al cluster)
El paquete completo de Putty puede descargarse de aquí https://www.chiark.greenend.org.uk/~sgtatham/putty/latest.html
3.1.2 Linux-Mac OS.
Para equipos linux y Mac OS la comunicación a través de SSH se puede efectuar directamente desde la terminal
ssh usuario@login.hpc.cedia.org.ec
Tanto para Windows como para Linux-Mac OS la conexión al clúster CEDIA se establecerá de esta manera:
así la comunicación se efectúa por línea de comandos (terminal). Ahora vamos a generar nuestra contraseña, la cual usaremos para acceder a RStudio en el cluster:
$passwd
3.2 Rstudio Server como interfaz en HPC
La siguiente tabla ha sido adaptada de https://support.rstudio.com/hc/en-us/articles/236226087-Scaling-R-and-RStudio
Caso de uso | Problema | Solución | Tecnología |
---|---|---|---|
Escalar varios usuarios R | Flujos de trabajo regulares para un equipo. Cargar datos desde archivos o warehouses | Plataforma para soporte a varias sesiones interactivas R | RStudio Server, RStudio Server Pro + Load Balancer |
Escalar para HPC | Tareas de paralelización: bootstrapping, cross validation, scoring, ajustes de modelos en sets independientes | Desarrollo de código en sesiones R interactivas en RStudio. Someter jobs en procesos esclavos R. R debe estar instalado en todos los nodos. | Local: parallel, Rmpi, snow, Rcpp parallel; Cluster: LSF, SLURM, Torque, Docker Swarm, batchtools |
Escalar para Big Data | Big data, rutinas caja negra que requieren ajustar un modelo en un dominio completo. Los datos no caben en un solo computador. | El trabajo pesado se deja a un motor de cómputo diferente. La sintaxis de R se usa para construir pipelines, y analizar resultados. | Hadoop, Spark, Tensorflow, Oracle BDA, Microsoft R Server, Aster, H20.ai |
3.3 Conexión remota a RStudio (Interfaz gráfica)
En sesiones pasadas ya se ha efectuado la generación de claves públicas y privadas desde nuestros equipos locales para posibilitar la conexión al cluster de CEDIA.
La conexión a través de interfaz gráfica a RStudio se realiza ingresando a la dirección https://rstudio.cedia.edu.ec donde se solicitan las credenciales de usuario.
3.3.0.1 Sobre los repositorios CRAN
La lista completa de repositios CRAN se encuentra en https://cran.r-project.org/mirrors.html
CEDIA cuenta con un respositorio CRAN en uno de los nodos del cluster. Agregaremos su repositorio en la configuración de Rstudio de manera que la descarga de paquetes sea más rápida.
En el menú de Rstudio -> Tools -> Global Options -> Packages , crear un nuevo enlace a repositorio:
3.3.1 Transfrencia de archivos a través de RStudio
Para subir un archivo desde su computador local a su carpeta /home/usuario en el clúster utilice la opción upload del menú “Files” en el panel inferior derecho de RStudio.
Para descargar un archivo desde su carpeta /home/usuario en el clúster hacia su computador local utilice la opción exportar del menú “Files” en el panel inferior derecho de RStudio. Es importante notar que esta opción transfiere el o los archivos originales sin crear una copia adicional (equivalente a mover un archivo)
3.3.2 Transfrencia de archivos a través de Interfaz Gráfica
3.3.2.1 Windows
Se recomienda el uso de WinSCP el cual incorpora el protocolo SCP (Linux y Mac OS lo tienen por defecto para uso por terminal directamente).
Este es el enlace para descarga: https://winscp.net/eng/download.php
Y aquí un tutorial paso a paso para la instalación y configuración de nuevo sitio. Revisar la sección “Transfer files from Windows”: https://research.computing.yale.edu/support/hpc/user-guide/transfer-files-or-cluster
Una vez instalado el programa, estos son los pasos a seguir para configurar nuestra cuenta.
3.3.2.2 Linux-Mac OS
La herammienta Cyberduck es de libre distribución, permite para nuestro caso efectuar transferencia de archivos desde y hacia el clúster de manera segura usando el puerto SFTP. Se puede descargar aquí https://cyberduck.io
3.3.2.3 Transferencia de archivos (Terminal Linux-Mac OS)
A través de la interfaz de terminal es posible usar scp para trasferencia de archivos. La sintaxis general es:
scp source destination
Un ejemplo para transferir archivos del computador local al clúster:
scp archivo.png nombre.apellido@login.hpc.cedia.org.ec:/home/nombre.apellido
y desde el clúster al computador local:
scp nombre.apellido@login.hpc.cedia.org.ec:/home/nombre.apellido/archivo.png /Users/miUsuario/Desktop
Para transferir directorios completos use la opción scp -r
.
3.4 mclapply
Ahora que todos estamos trabajando con Linux como sistema operativo base podemos hacer uso de esta función.
- Fácil de implementar; clona el enviroment automáticamente
- Rápido
- Puede usar solamente los procesadores de un computador
- Usa fork y por lo tanto no funciona en MS Windows
#Default options
mclapply(X, FUN, ...,
mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE)
3.4.1 mclapply vs. lapply
Para los ejemplos a continuación usaremos el archivo dataset.csv
. Este archivo contiene 250000 muestras. En este enlace puede descargar tanto el archivo .csv como el archivo .R a partir del cual fue generado.
Ejemplo usando lapply
###### using lapply
time_lapply_kmeans<-system.time({
data <- read.csv('dataset.csv')
parallel.function <- function(i) {
kmeans( data, centers=4, nstart=i )
}
results <- lapply( c(25, 25, 25, 25), FUN=parallel.function )
temp.vector <- sapply( results, function(result) { result$tot.withinss } )
result <- results[[which.min(temp.vector)]]
print(result$tot.withinss)
})
## [1] 133747.2
Ejemplo usando mclapply (2 cores)
### using mclapply
library(parallel)
time_mclapply_kmeans<-system.time({
data <- read.csv('dataset.csv')
parallel.function <- function(i) {
kmeans( data, centers=4, nstart=i )
}
results <- mclapply( c(25, 25, 25, 25), FUN=parallel.function, mc.cores = 2L )
temp.vector <- sapply( results, function(result) { result$tot.withinss } )
result <- results[[which.min(temp.vector)]]
print(result$tot.withinss)
})
## [1] 133747.2
Tiempos de respuesta
time_lapply_kmeans
## user system elapsed
## 28.564 0.383 29.041
time_mclapply_kmeans
## user system elapsed
## 16.313 0.527 17.358
3.4.2 Usando multicore en mclapply
El argumento mc.cores
permite modificar el número de cores (procesadores) que se utilizan en el proceso de paralelización. Exploremos el rendimiento de nuestro modelo kmeans con diferentes números de procesadores (2,4,8,16). El código abajo indica el argumento mc.cores = 2L
, ejecutemos el script modificando a las otras posibilidades 4, 8 y 16.
Usemos la sentencia rm(list= ls()[! ls() %in% c("mc_2","mc_4",..)])
al inicio de cada corrida para mantener los valores resultado de system.time previos y al final visualizarlos juntos mandando a imprimir las variables mc_2
, mc_4
, etc.
rm(list=ls(all=TRUE))
gc()
## used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 1561134 83.4 3205452 171.2 3205452 171.2
## Vcells 3015920 23.1 11874215 90.6 14756876 112.6
require(parallel)
time_2cores<-system.time({
data <- read.csv('dataset.csv')
parallel.function <- function(i) {
kmeans( data, centers=4, nstart=i )
}
RNGkind("L'Ecuyer-CMRG")
set.seed(1231231)
mc.reset.stream()
results <- mclapply( rep(5,8), FUN=parallel.function, mc.cores = 2L)
temp.vector <- sapply( results, function(result) { result$tot.withinss } )
result <- results[[which.min(temp.vector)]]
print(result$tot.withinss)
})
## [1] 133747.2
Los resultados ejecutados en el cluster usando mclapply para kmeans:
Cores | user | system | elapsed |
---|---|---|---|
2 | 17.694 | 0.802 | 9.644 |
4 | 18.819 | 1.695 | 5.757 |
8 | 20.604 | 2.489 | 3.623 |
16 | 20.948 | 2.679 | 3.818 |
Conclusión :para este ejemplo, únicamente 8 cores es el óptimo pues nuestro vector tiene 8 elementos (usar más cores puede incluso afectar el tiempo de cómputo por la comunicación).
3.4.3 Comparando parLapply (fork y socket)
Vamos a plantear el problema ahora usando las funciones de make cluster
y parLapply
tanto usando PSOCK como FORK. PSOCK es una versión mejorada de SOCK. El código abajo está elaborado para 8 procesadores. Genera nuevamente este código con 2 procesadores para cuantificar la diferencia de tiempos.
rm(list=ls(all=TRUE))
gc()
## used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 1561449 83.4 3205452 171.2 3205452 171.2
## Vcells 3017848 23.1 9499372 72.5 14756876 112.6
require(parallel)
makeFORKCluster8<-system.time({
data <- read.csv( 'dataset.csv' )
parallel.function <- function(i) {
kmeans( data, centers=4, nstart=i )
}
cl <- makeForkCluster(8L) # Equivalente makeCluster(4L, type= "FORK")
clusterSetRNGStream(cl, 123)
#clusterExport(cl, c('data')) -> Fork o requiere exportación de datos
results <- parLapply( cl, rep(5,8), fun=parallel.function )
temp.vector <- sapply( results, function(result) { result$tot.withinss } )
result <- results[[which.min(temp.vector)]]
print(result$tot.withinss)
stopCluster(cl)
})
## [1] 133747.2
######## USANDO PSOCK
rm(list= ls()[! ls() %in% c("makeFORKCluster8")])
gc()
## used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 1563441 83.5 3205452 171.2 3205452 171.2
## Vcells 3021600 23.1 9499372 72.5 14756876 112.6
require(parallel)
makePSOCKcluster8<-system.time({
data <- read.csv( 'dataset.csv' )
parallel.function <- function(i) {
kmeans( data, centers=4, nstart=i )
}
cl <- makePSOCKcluster(8L) # Equivalent makeCluster(4L, type= "FORK")
clusterSetRNGStream(cl, 123)
clusterExport(cl, c('data'))
results <- parLapply( cl, rep(5,8), fun=parallel.function )
temp.vector <- sapply( results, function(result) { result$tot.withinss } )
result <- results[[which.min(temp.vector)]]
print(result$tot.withinss)
stopCluster(cl)
})
## [1] 133747.2
makeFORKCluster8
## user system elapsed
## 0.995 0.047 18.745
makePSOCKcluster8
## user system elapsed
## 1.062 0.050 18.889
Nota: Seguramente los resultados que acabas de obtener son mucho mejores.
Incorporamos en la tabla los resultados de mclapply obtenidos previamente para propósitos de comparación:
Función | Cores | user | system | elapsed |
---|---|---|---|---|
makePSOCKcluster | 2 | 0.773 | 0.072 | 9.997 |
makeFORKCluster | 2 | 0.787 | 0.060 | 9.954 |
mclapply | 2 | 17.694 | 0.802 | 9.644 |
makePSOCKcluster | 8 | 0.864 | 0.194 | 6.066 |
makeFORKCluster | 8 | 10.781 | 0.094 | 3.899 |
mclapply | 8 | 20.604 | 2.489 | 3.623 |
Algunas cosas a recordar: + mclapply (usa FORK) (clona la sesión R a los procesos hijos) + parlapply requiere creación de cluster (clusterPSOCK - clusterFORK…) + parlapply + clusterFORK es equivalente funcionalmente a mclapply + parlapply paraleliza en múltiples computadas, crea un cluster de procesos que se comunican via TCP/IP.
3.4.4 Números Aleatorios y Experimentos reproducibles
Cuando iniciamos una nueva sesión en R o incluso dentro de la misma sesión el dato referencial para generar valores aleatorios cambia. Es decir, no sería posible replicar experimentos que dependen de valores aleatorios. Por ejemplo:
Cuando se usa mclapply la designación del método para generación de valores aleatorios se hace mediante:
RNGkind("L'Ecuyer-CMRG")
Por defecto dentro de la función mclapply
, el parámetro mc.set.seed = TRUE
. Sin embargo para garantizar la generación de valores aleatorios diferentes para cada core (nodo) se debe seleccionar RNGkind("L'Ecuyer-CMRG")
.
RNGkind("L'Ecuyer-CMRG")
set.seed(7777442)
mc.reset.stream()
unlist(mclapply(1:2, function(i) rnorm(1)))
## [1] -2.0043112 0.9315424
Que sucede si volvemos a ejecutar la sentencia unlist(mclapply(1:2, function(i) rnorm(1)))
-> la función mclapply no modifica la secuencia del valor aleatorio inicial (set.seed) sobre la segunda llamada mclapply.
3.5 Balance de Carga
3.5.1 clusterApplyLB
clusterApplyLB
es una versión de clusterApply que maneja balance de carga.
Cuando la longitud n de x no es mayor que el número de nodos p, entonces el job se envía a n nodos. Caso contratio, los primeros p jobs se envían a los p nodos. Cuando el primer job se ha completado, el próximo job se envía al nodo que acaba de ser liberado y así hasta que se completan todos los jobs.
Usar clusterApplyLB
puede generar una mejor utilización del cluster que clusterApply, no obstante la necesidad de comunicación puede reducir el rendimiento.
Por lo tanto el nodo que ejecuta un job particular no es siempre el mismo, entonces las simulaciones que asignan RNG streams a los nodos no serán reproducibles.
Un ejemplo sencillo para entender como se distribuye el tiempo usando balance de carga:
cl <- parallel::makeCluster(2)
system.time(
parallel::parLapply(cl, 1:4, function(y) { # Sin balance de carga
if (y == 1) {
Sys.sleep(3)
} else {
Sys.sleep(0.5)
}}))
## user system elapsed
## 0.001 0.000 3.504
system.time(
parallel::parLapplyLB(cl, 1:4, function(y) { # Con balance de carga parLapply
if (y == 1) {
Sys.sleep(3)
} else {
Sys.sleep(0.5)
}}))
## user system elapsed
## 0.001 0.000 3.509
system.time(
parallel::clusterApplyLB(cl, 1:4, function(y) { #Con balance de carga clusterApplyLB
if (y == 1) {
Sys.sleep(3)
} else {
Sys.sleep(0.5)
}}))
## user system elapsed
## 0.002 0.001 3.002
parallel::stopCluster(cl)
3.5.2 Cuándo usar balance de carga?
Cuando aplicar la función definida a diferentes elementos de X toma un tiempo de procesamiento bastante variable y, en tanto la función sea determinística o no se requieran resultados reproducibles.
Analicemos la influence de balance de carga en el siguiente ejemplo.
- parLApply : asigna estátimente los chunks de cómputo (procesador no se comunica con master hasta terminar la tarea)
- clusterApply: asigna un chunk y espera por la comunicación de los procesadores para enviar el siguiente.
- clusterApplyLB: recicla los nodos, es decir cada vez que un nodo está libre le asigna una nueva tarea. La asignación de los chunks de cómputo es dinámica.
require(snow)
## Loading required package: snow
##
## Attaching package: 'snow'
## The following objects are masked from 'package:parallel':
##
## clusterApply, clusterApplyLB, clusterCall, clusterEvalQ,
## clusterExport, clusterMap, clusterSplit, makeCluster,
## parApply, parCapply, parLapply, parRapply, parSapply,
## splitIndices, stopCluster
set.seed(1237)
sleep = sample(1:10,10)
spec = 4
cl = makeCluster(spec, type="SOCK")
#Los chunks
sleep
## [1] 8 6 2 10 7 9 4 3 5 1
clusterSplit(cl, sleep)
## [[1]]
## [1] 8 6 2
##
## [[2]]
## [1] 10 7
##
## [[3]]
## [1] 9 4
##
## [[4]]
## [1] 3 5 1
st = snow.time(clusterApply(cl, sleep, Sys.sleep))
stLB = snow.time(clusterApplyLB(cl, sleep, Sys.sleep))
stPL = snow.time(parLapply(cl, sleep, Sys.sleep))
stPL = snow.time(parLapply(cl, sleep, Sys.sleep))
stopCluster(cl)
par(mfrow=c(3,1),mar=rep(2,4))
plot(st, title="clusterApply")
plot(stLB, title="clusterApplyLB")
plot(stPL, title="parLapply")
Un ejemplo para probar el balance de carga. ¿En cuánto hemos reducido el tiempo de cómputo?.
rm(list=ls(all=TRUE))
gc()
require(parallel)
data <- read.csv( 'dataset.csv' )
parallel.function <- function(i) {
kmeans( data, centers=4, nstart=i )
}
cl <- parallel::makeForkCluster(4L) # Equivalente makeCluster(4L, type= "FORK")
clusterSetRNGStream(cl, 123)
time_parLapply<-system.time({
results <- parallel::parLapply( cl, c(1,1,5,5,2,2,1,1), fun=parallel.function )
temp.vector <- sapply( results, function(result) { result$tot.withinss } )
result <- results[[which.min(temp.vector)]]
print(result$tot.withinss)
})
time_parLapplyLB<-system.time({
results <- parLapplyLB( cl, c(1,1,5,5,2,2,1,1), fun=parallel.function )
temp.vector <- sapply( results, function(result) { result$tot.withinss } )
result <- results[[which.min(temp.vector)]]
print(result$tot.withinss)
})
stopCluster(cl)
time_parLapply
time_parLapplyLB