4 Paralelización con R (rslurm)

4.1 SLURM

Página oficial de SLURM https://slurm.schedmd.com

SLURM Workload Manager o formalmente (Simple Linux Utility for Resource Management) es un sistema de manejo de cluster y calendarización de tareas para grandes y pequeños cluster Linux, el cual es open source, tolerante a fallas y altamente escalable.

SLURM cumple tres funciones principales:

  1. Asigna acceso exclusivo y/o no exclusivo a recursos (nodos de cómputo) a usuarios por un tiempo determinado para que estos puedan ejecutar una tarea.

  2. Provee un framework para iniciar, ejecutar y monitorear tareas (normalmente tareas paralelas) en un conjunto de nodos asignados.

  3. Coordina la solicitud de recursos a través de una cola de tareas pendientes.

¿Cómo se encarga Slurm de manejar los jobs?

  • Asignando recursos de computador solicitados por el job
  • Ejecutando el job y
  • Reportanto la salida de la ejecución al usuario.

Ejecutar un job requiere al menos los siguientes pasos:

  • Preparar un script para someterlo y
  • Someter el job para ejecución.

Unos enlaces con descripciones de slurm para usuarios no expertos. https://hpc.uni.lu/users/docs/slurm.html#slurm-overview http://www.arc.ox.ac.uk/content/slurm-job-scheduler

4.2 Comandos básicos de Slurm

Comando Descripción
squeue ver información de jobs en cola
sinfo ver cola, partición e información de nodo
sbatch someter un job a través de ejecución batch (scripted)
srun someter un job interactivo
scancel cancelar jobs en cola
scontrol control e información detallada de jobs, colas y particiones.
sstat ver a nivel de sistema la utilización de recursos (memoria, I/O, energía)
sacct ver a nivel de sistema la utilización de recursos de jobs completados.



Algunos ejemplos de uso de comandos

Acción Comando SLURM
Someter un job en batch (forma pasiva) sbatch $script
Iniciar un job interactivo srun –pty bash -i
Estado de la cola de jobs squeue
Estado de la cola de un usuario squeue -u $username
Estado detallado de job específico scontrol show job $jobid
Eliminar un job en estado (running/waiting) job scancel $jobid
Pausar el job para control scontrol hold $jobid
Reactivar el job en control scontrol release $jobid
Lista de nodos y sus propiedades scontrol show nodes



Más ejemplos útiles (para usuarios inexpertos con terminales)

  1. Someter un job para ejecución pasiva
sbatch myscript.sh
  1. Evaluar un job e identificar cuando se estima que este se puede ejecutar en el cluster (este comando no somete el job)
sbatch --test-only myscript.sh
  1. Listar los jobs de un usuario específico
squeue -u $username
  1. Listar información detallada de un job (útil para identificar problemas)
scontrol show jobid -dd $jobid
  1. Listar la información de estado de un job ejecutándose actualmente
sstat --format=AveCPU,AvePages,AveRSS,AveVMSize,JobID -j $jobid --allsteps
  1. Cancelar un job
scancel $jobid
  1. Cancelar todos los jobs de un usuario
scancel -u $username
  1. Cancelar todos los jobs pendientes de un usuario
scancel -t PENDING -u $username

Una larga lista de ejemplos en https://www.rc.fas.harvard.edu/resources/documentation/convenient-slurm-commands/

4.3 Librería rslurm

Un caso muy común de procesos intensivos en R están relacionados a evaluaciones repetitivas sobre varios items o conjuntos de parámetros.

El paquete rslurm simplifica el proceso de distribuión de este tipo de cálculos sobre clusters que usan el “Slurm worload manager”. La función principal, slurm_apply, automáticamente divide el cómputo sobre múltiples nodos y escribe los scripts a someter (submit). Además incluye funciones para recuperar y combinar los resultados de los diferentes nodos así como “wrappers” de comandos Slurm.

4.3.1 Pasos a seguir

  1. Cargar librería rslurm
  2. Creación de funciones a usar sobre el conjunto de parámetros. Debe existir al menos una función principal.
  3. Creación de data frame de parámetros
  4. Llamada a crear y (alternativamente) ejecutar el job usando a slurm -> slurm_apply
  5. Retornar los resultados -> get_slurm_out
  6. Eliminar (limpiar) archivos generados en el proceso -> cleanup_files

Sintetizando, requerimos definir una función de llamada para rslurm y un dataframe que contiene los argumentos de dicha función, donde cada fila corresponde a un conjunto de argumentos para una ejecución.

4.3.2 ¿Qué pasa cuando se ejecuta el paso correspondiente a slurm_apply siguiendo los pasos anteriores?

Se crea una carpeta específica del job. Esta contiene los parámetros en un archivo .RDS, y (si aplica) todos los objetos especificados en el argumento add_objects almacenados en un archivo .RData.

Se genera un script .R (slurm_run.R) el cual se va a ejecutar en cada nodo del cluster así como un script Bash (submit.sh) que permite someter el job a Slurm.

El script Bash ordena a Slurm crear un job array y el script R aprovecha la variable SLURM_ARRAY_TASK_ID que Slurm configurará para cada nodo del cluster. Entonces esta variable es leída por slurm_run.R, lo cual permite a cada instancia del script operar con un subconjunto diferente de datos y generar su archivo de resultados individual por nodo.

El script R hace una llamada a la función parallel::mcMap para ejecutar la paralelización en cada nodo.

4.3.3 Sobre las funciones de rslurm

slurm_apply incorpora los argumentos add_objects y slurm_options que pueden ser de especial interés. El primero permite enviar variables y funciones a los procesos hijos y el segundo permite incluir opciones específicas de slurm respecto a la función sbatch la cual ejecutará el job.

La página oficial de SLURM respecto a los parámetros de la función sbatch https://slurm.schedmd.com/sbatch.html

print_job_status visualiza el estado del job en la cola o indica si se ha completado.

4.4 Ejemplos

El ejemplo base se explica de acuerdo a los pasos indicados en la sección 4.3.1.

Función que toma dos argumentos (a) media y (b) desviación estándar) como parámetros y genera un millón de muestras y retorna la media y desviación estándar de la muestra.

test_func <- function(par_mu, par_sd) {
    samp <- rnorm(10^6, par_mu, par_sd)
    c(s_mu = mean(samp), s_sd = sd(samp))
}

Un data frame que almacena los conjuntos de parámetros. Es importante recordar que cada parámetro/argumento debe corresponder a una columna del data frame.

pars <- data.frame(par_mu = 1:10,
                   par_sd = seq(0.1, 1, length.out = 10))
head(pars, 3)
##   par_mu par_sd
## 1      1    0.1
## 2      2    0.2
## 3      3    0.3

Para llamar a ejecutar la creación del job y su calendarización (en el caso de que submit = TRUE) se hace uso de la función slurm_apply:

library(rslurm)
sjob <- slurm_apply(test_func, pars, jobname = 'test_apply',
                    nodes = 2, cpus_per_node = 2, 
                    slurm_options = list(exclude = "compute-0-6"),
                    submit = TRUE)
sjob

print_job_status(sjob)

Cuando se ha ejecutado el job, sea manual (submit = FALSE) o automáticamente (submit = TRUE), es posible recuperar la lista de los nombres de archivos generados por cada nodo.

list.files('_rslurm_test_apply', 'results')

Los resultados de todos los nodos se pueden leer ahora usando la función get_slurm_out().

res <- get_slurm_out(sjob, outtype = 'table') 
head(res, 3)

#Para resultados más complejos use "raw"
res_raw <- get_slurm_out(sjob, outtype = 'raw')
res_raw[1:3]

Finalmente, borrar todos los archivos que se han generado en mi carpeta de job.

cleanup_files(sjob)

IMPORTANTE: Para definir las opciones de slurm en el argumento slurm_options es indispensable explorar como se encuentra la cola de jobs usando el comando squeue.



El código completo del ejemplo base

library(rslurm)

#FUNCION
test_func <- function(par_mu, par_sd) {
  samp <- rnorm(10^6, par_mu, par_sd)
  c(s_mu = mean(samp), s_sd = sd(samp))
  }

#PARAMETROS
pars <- data.frame(par_mu = 1:10,
                   par_sd = seq(0.1, 1, length.out = 10))
head(pars, 3)

#CREACION DE JOB
sjob <- slurm_apply(test_func, pars, jobname = 'test_apply',
                    nodes = 1, cpus_per_node = 2, 
                    slurm_options = list(exclude = "compute-0-6"),
                    submit = TRUE)
sjob
print_job_status(sjob)

list.files('_rslurm_test_apply', 'results')

#RECUPERACION DE RESULTADOS
res <- get_slurm_out(sjob, outtype = 'table') 
head(res, 3)

res_raw <- get_slurm_out(sjob, outtype = 'raw')
res_raw[1:3]

#LIMPIEZA DE ARCHIVOS
cleanup_files(sjob)



Un ejemplo con kmeans pasando objetos adicionales

library(rslurm)

data <- read.csv('dataset.csv')

myFunction <- function(i, j) {
  kmeans( data, centers=i, nstart=j )
}

myiter <- 3
nstarts <- rep(25, myiter)
nclus <- 2:11
params <- expand.grid(j=nstarts,i=nclus) 
params

# Objetos adicionales "data"
sjob <- slurm_apply(myFunction, params, nodes = 1, cpus_per_node = 8, 
                    add_objects = "data", slurm_options = list(exclude = "compute-0-6"))

# Estado de ejecución del job
print_job_status(sjob)

# Recuperar resutados de archivos y combinarlos en un solo data frame 
results <- get_slurm_out(sjob, outtype = 'raw')
summary(results)

# Eliminar archivos temporales generados por slurm
cleanup_files(sjob)



Ejemplo de validación cruzada adaptado

n <- 100
set.seed(123)
x <- rnorm(n)
y <- x + rnorm(n)

data <- data.frame(x, y)
K <- 10
samples <- split(sample(1:n), rep(1:K, length = n))

#Crear dataframe de parametros 
params <- data.frame("index" = seq(along = samples))

cv.fold.fun <- function(index){
  fit <- lm(y~x, data = data[-samples[[index]],])
  pred <- predict(fit, newdata = data[samples[[index]],]) 
  return((pred - data$y[samples[[index]]])**2)
}

#CREACION DE JOB
sjob <- slurm_apply(cv.fold.fun, params, jobname = '_crossval',
                    nodes = 1, cpus_per_node = 8, 
                    add_objects = c("samples","data"),
                    slurm_options = list(exclude = "compute-0-6"),
                    submit = TRUE)

print_job_status(sjob)

list.files('_rslurm_test_apply', 'results')

#RECUPERACION DE RESULTADOS
res <- get_slurm_out(sjob, outtype = 'table') 
head(res, 3)

res_raw <- get_slurm_out(sjob, outtype = 'raw')
res_raw[1:3]

#LIMPIEZA DE ARCHIVOS
cleanup_files(sjob)