第 5 章 数据搬运工

导入数据与导出数据,各种数据格式,数据库

5.1 导入数据文件

Base R 针对不同的数据格式文件,提供了大量的数据导入和导出函数,不愧是专注数据分析20余年的优秀统计软件。 除了函数 write.ftableread.ftable 来自 stats 包,都来自 base 和 utils 包

# 当前环境的搜索路径
searchpaths()
#> [1] ".GlobalEnv"                                  
#> [2] "C:/Program Files/R/R-3.6.0/library/stats"    
#> [3] "C:/Program Files/R/R-3.6.0/library/graphics" 
#> [4] "C:/Program Files/R/R-3.6.0/library/grDevices"
#> [5] "C:/Program Files/R/R-3.6.0/library/utils"    
#> [6] "C:/Program Files/R/R-3.6.0/library/datasets" 
#> [7] "C:/Program Files/R/R-3.6.0/library/methods"  
#> [8] "Autoloads"                                   
#> [9] "C:/PROGRA~1/R/R-36~1.0/library/base"
# 返回匹配结果及其所在路径的编号
apropos("^(read|write)", where = TRUE, mode = "function")
#>                  5                  5                  9                  5 
#>         "read.csv"        "read.csv2"         "read.dcf"       "read.delim" 
#>                  5                  5                  5                  2 
#>      "read.delim2"         "read.DIF"     "read.fortran"      "read.ftable" 
#>                  5                  5                  5                  9 
#>         "read.fwf"      "read.socket"       "read.table"          "readBin" 
#>                  9                  5                  5                  9 
#>         "readChar" "readCitationFile"    "readClipboard"         "readline" 
#>                  9                  9                  5                  9 
#>        "readLines"          "readRDS"     "readRegistry"     "readRenviron" 
#>                  9                  5                  5                  9 
#>            "write"        "write.csv"       "write.csv2"        "write.dcf" 
#>                  2                  5                  5                  9 
#>     "write.ftable"     "write.socket"      "write.table"         "writeBin" 
#>                  9                  5                  9 
#>        "writeChar"   "writeClipboard"       "writeLines"

5.1.1 scan

scan(file = "", what = double(), nmax = -1, n = -1, sep = "",
     quote = if(identical(sep, "\n")) "" else "'\"", dec = ".",
     skip = 0, nlines = 0, na.strings = "NA",
     flush = FALSE, fill = FALSE, strip.white = FALSE,
     quiet = FALSE, blank.lines.skip = TRUE, multi.line = TRUE,
     comment.char = "", allowEscapes = FALSE,
     fileEncoding = "", encoding = "unknown", text, skipNul = FALSE)

首先让我们用 cat 函数创建一个练习数据集 ex.data

cat("TITLE extra line", "2 3 5 7", "11 13 17")
#> TITLE extra line 2 3 5 7 11 13 17
cat("TITLE extra line", "2 3 5 7", "11 13 17", file = "data/ex.data", sep = "\n")

以此练习数据集,介绍 scan 函数最常用的参数

scan("data/ex.data")
#> Error in scan("data/ex.data"): scan() expected 'a real', got 'TITLE'

从上面的报错信息,我们发现 scan 函数只能读取同一类型的数据,如布尔型 logical, 整型 integer,数值型 numeric(double), 复数型 complex,字符型 character,raw 和列表 list。所以我们设置参数 skip = 1 把第一行跳过,就成功读取了数据

scan("data/ex.data", skip = 1)
#> [1]  2  3  5  7 11 13 17

如果设置参数 quiet = TRUE 就不会报告读取的数据量

scan("data/ex.data", skip = 1, quiet = TRUE)
#> [1]  2  3  5  7 11 13 17

参数 nlines = 1 表示只读取一行数据

scan("data/ex.data", skip = 1, nlines = 1) # only 1 line after the skipped one
#> [1] 2 3 5 7

默认参数 flush = TRUE 表示读取最后一个请求的字段后,刷新到行尾,下面对比一下读取的结果

scan("data/ex.data", what = list("", "", "")) # flush is F -> read "7"
#> Warning in scan("data/ex.data", what = list("", "", "")): number of items read
#> is not a multiple of the number of columns
#> [[1]]
#> [1] "TITLE" "2"     "7"     "17"   
#> 
#> [[2]]
#> [1] "extra" "3"     "11"    ""     
#> 
#> [[3]]
#> [1] "line" "5"    "13"   ""
scan("data/ex.data", what = list("", "", ""), flush = TRUE)
#> [[1]]
#> [1] "TITLE" "2"     "11"   
#> 
#> [[2]]
#> [1] "extra" "3"     "13"   
#> 
#> [[3]]
#> [1] "line" "5"    "17"

临时文件 ex.data 用完了,我们调用 unlink 函数将其删除,以免留下垃圾文件

unlink("data/ex.data") # tidy up

5.1.2 read.table

read.table(file, header = FALSE, sep = "", quote = "\"'",
           dec = ".", numerals = c("allow.loss", "warn.loss", "no.loss"),
           row.names, col.names, as.is = !stringsAsFactors,
           na.strings = "NA", colClasses = NA, nrows = -1,
           skip = 0, check.names = TRUE, fill = !blank.lines.skip,
           strip.white = FALSE, blank.lines.skip = TRUE,
           comment.char = "#",
           allowEscapes = FALSE, flush = FALSE,
           stringsAsFactors = default.stringsAsFactors(),
           fileEncoding = "", encoding = "unknown", text, skipNul = FALSE)

read.csv(file, header = TRUE, sep = ",", quote = "\"",
         dec = ".", fill = TRUE, comment.char = "", ...)

read.csv2(file, header = TRUE, sep = ";", quote = "\"",
          dec = ",", fill = TRUE, comment.char = "", ...)

read.delim(file, header = TRUE, sep = "\t", quote = "\"",
           dec = ".", fill = TRUE, comment.char = "", ...)

read.delim2(file, header = TRUE, sep = "\t", quote = "\"",
            dec = ",", fill = TRUE, comment.char = "", ...)

变量名是不允许以下划线开头的,同样在数据框里,列名也不推荐使用下划线开头。默认情况下,read.table 都会通过参数 check.names 检查列名的有效性,该参数实际调用了函数 make.names 去检查。如果想尽量保持数据集原来的样子可以设置参数 check.names = FALSE, stringsAsFactors = FALSE。 默认情形下,read.table 还会将字符串转化为因子变量,这是 R 的历史原因,作为一门统计学家的必备语言,在统计模型中,字符常用来描述类别,而类别变量在 R 环境中常用因子类型来表示,而且大量内置的统计模型也是将它们视为因子变量,如 lmglm

dat1 = read.table(header = TRUE, check.names = TRUE, text = "
_a _b _c
1 2 a1
3 4 a2
")
dat1
#>   X_a X_b X_c
#> 1   1   2  a1
#> 2   3   4  a2
dat2 = read.table(header = TRUE, check.names = FALSE, text = "
_a _b _c
1 2 a1
3 4 a2
")
dat2
#>   _a _b _c
#> 1  1  2 a1
#> 2  3  4 a2
dat3 = read.table(header = TRUE, check.names = FALSE, stringsAsFactors = FALSE, text = "
_a _b _c
1 2 a1
3 4 a2
")
dat3
#>   _a _b _c
#> 1  1  2 a1
#> 2  3  4 a2

5.1.3 readLines

readLines(con = stdin(), n = -1L, ok = TRUE, warn = TRUE,
          encoding = "unknown", skipNul = FALSE)

让我们折腾一波,读进来又写出去,只有 R 3.5.3 以上才能保持原样的正确输入输出

writeLines(readLines("latex/TeXLive.pkgs"),"latex/TeXLive.pkgs")

这次我们创建一个真的临时文件,因为重新启动 R 这个文件和文件夹就没有了,回收掉了

fil <- tempfile(fileext = ".data")
cat("TITLE extra line", "2 3 5 7", "", "11 13 17", file = fil,
    sep = "\n")
fil
#> [1] "C:\\Users\\xy-huang\\AppData\\Local\\Temp\\Rtmpgf5xhe\\file5d87bb817b1.data"

设置参数 n = -1 表示将文件 fil 的内容从头读到尾

readLines(fil, n = -1)
#> [1] "TITLE extra line" "2 3 5 7"          ""                 "11 13 17"

作为拥有良好习惯的 R 用户,这种垃圾文件最好用后即焚

unlink(fil) # tidy up

再举个例子,我们创建一个新的临时文件 fil,文件内容只有

cat("123\nabc")
#> 123
#> abc
fil <- tempfile("test")
cat("123\nabc", file = fil, append = TRUE)
fil
#> [1] "C:\\Users\\xy-huang\\AppData\\Local\\Temp\\Rtmpgf5xhe\\test5d81faf6d11"
readLines(fil)
#> Warning in readLines(fil): incomplete final line found on 'C:\Users\xy-
#> huang\AppData\Local\Temp\Rtmpgf5xhe\test5d81faf6d11'
#> [1] "123" "abc"

这次读取文件的过程给出了警告,原因是 fil 没有以空行结尾,warn = TRUE 表示这种情况要给出警告,如果设置参数 warn = FALSE 就没有警告。我们还是建议大家尽量遵循规范。

再举一个例子,从一个连接读取数据,建立连接的方式有很多,参见 ?file,下面设置参数 blocking

con <- file(fil, "r", blocking = FALSE)
readLines(con)
#> [1] "123"

cat(" def\n", file = fil, append = TRUE)
readLines(con)
#> [1] "abc def"
# 关闭连接
close(con)
# 清理垃圾文件
unlink(fil)

5.1.4 readRDS

序列化数据操作,Mark Klik 开发的 fstTravers Ching 开发的 qs, Hadley Wickham 开发的 feather 包实现跨语言环境快速的读写数据

表 5.1: fst 序列化数据框对象性能比较 BaseR、 data.table 和 feather15
Method Format Time (ms) Size (MB) Speed (MB/s) N
readRDS bin 1577 1000 633 112
saveRDS bin 2042 1000 489 112
fread csv 2925 1038 410 232
fwrite csv 2790 1038 358 241
read_feather bin 3950 813 253 112
write_feather bin 1820 813 549 112
read_fst bin 457 303 2184 282
write_fst bin 314 303 3180 291

目前比较好的是 qs 和 fst 包

5.2 保存输出结果

capture.output(..., file = NULL, append = FALSE,
               type = c("output", "message"), split = FALSE)

capture.output 将一段R代码执行结果,保存到文件,参数为表达式。capture.outputsink 的关系相当于 withattach 的关系。

glmout <- capture.output(summary(glm(case ~ spontaneous + induced,
  data = infert, family = binomial()
)), file = "data/capture.txt")
capture.output(1 + 1, 2 + 2)
#> [1] "[1] 2" "[1] 4"
capture.output({
  1 + 1
  2 + 2
})
#> [1] "[1] 4"

sink 函数将控制台输出结果保存到文件,只将 outer 函数运行的结果保存到 ex-sink.txt 文件,outer 函数计算的是直积,在这里相当于 seq(10) %*% t(seq(10)),而在 R 语言中,更加有效的计算方式是 tcrossprod(seq(10),seq(10))

sink("data/ex-sink.txt")
i <- 1:10
outer(i, i, "*") 
#>       [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9] [,10]
#>  [1,]    1    2    3    4    5    6    7    8    9    10
#>  [2,]    2    4    6    8   10   12   14   16   18    20
#>  [3,]    3    6    9   12   15   18   21   24   27    30
#>  [4,]    4    8   12   16   20   24   28   32   36    40
#>  [5,]    5   10   15   20   25   30   35   40   45    50
#>  [6,]    6   12   18   24   30   36   42   48   54    60
#>  [7,]    7   14   21   28   35   42   49   56   63    70
#>  [8,]    8   16   24   32   40   48   56   64   72    80
#>  [9,]    9   18   27   36   45   54   63   72   81    90
#> [10,]   10   20   30   40   50   60   70   80   90   100
sink()

5.3 保存数据对象

load(file, envir = parent.frame(), verbose = FALSE)

save(..., list = character(),
     file = stop("'file' must be specified"),
     ascii = FALSE, version = NULL, envir = parent.frame(),
     compress = isTRUE(!ascii), compression_level,
     eval.promises = TRUE, precheck = TRUE)

save.image(file = ".RData", version = NULL, ascii = FALSE,
           compress = !ascii, safe = TRUE)

loadsave 函数加载或保存包含工作环境信息的数据对象,save.image 保存当前工作环境到磁盘,即保存工作空间中所有数据对象,数据格式为 .RData,即相当于

save(list = ls(all.names = TRUE), file = ".RData", envir = .GlobalEnv)

dump 保存数据对象 AirPassengers 到文件 AirPassengers.txt,文件内容是 R 命令,可把AirPassengers.txt看作代码文档执行,dput 保存数据对象内容到文件AirPassengers.dat,文件中不包含变量名 AirPassengers。注意到 dump 输入是一个字符串,而 dput 要求输入数据对象的名称,source 函数与 dump 对应,而 dgetdput对应。

# 加载数据
data(AirPassengers, package = "datasets")
# 将数据以R代码块的形式保存到文件
dump('AirPassengers', file = 'data/AirPassengers.txt') 
# source(file = 'data/AirPassengers.txt')

接下来,我们读取 AirPassengers.txt 的文件内容,可见它是一段完整的 R 代码,可以直接复制到 R 的控制台中运行,并且得到一个与原始 AirPassengers 变量一样的结果

cat(readLines('data/AirPassengers.txt'), sep = "\n")
#> AirPassengers <-
#> structure(c(112, 118, 132, 129, 121, 135, 148, 148, 136, 119, 
#> 104, 118, 115, 126, 141, 135, 125, 149, 170, 170, 158, 133, 114, 
#> 140, 145, 150, 178, 163, 172, 178, 199, 199, 184, 162, 146, 166, 
#> 171, 180, 193, 181, 183, 218, 230, 242, 209, 191, 172, 194, 196, 
#> 196, 236, 235, 229, 243, 264, 272, 237, 211, 180, 201, 204, 188, 
#> 235, 227, 234, 264, 302, 293, 259, 229, 203, 229, 242, 233, 267, 
#> 269, 270, 315, 364, 347, 312, 274, 237, 278, 284, 277, 317, 313, 
#> 318, 374, 413, 405, 355, 306, 271, 306, 315, 301, 356, 348, 355, 
#> 422, 465, 467, 404, 347, 305, 336, 340, 318, 362, 348, 363, 435, 
#> 491, 505, 404, 359, 310, 337, 360, 342, 406, 396, 420, 472, 548, 
#> 559, 463, 407, 362, 405, 417, 391, 419, 461, 472, 535, 622, 606, 
#> 508, 461, 390, 432), .Tsp = c(1949, 1960.91666666667, 12), class = "ts")

dput 函数类似 dump 函数,保存数据对象到磁盘文件

dput(AirPassengers, file = 'data/AirPassengers.dat')
# AirPassengers <- dget(file = 'data/AirPassengers.dat')

同样地,现在我们观察 dput 函数保存的文件 AirPassengers.dat 内容,和dump 函数保存的文件 AirPassengers.txt相比,就缺一个赋值变量

cat(readLines('data/AirPassengers.dat'), sep = "\n")
#> structure(c(112, 118, 132, 129, 121, 135, 148, 148, 136, 119, 
#> 104, 118, 115, 126, 141, 135, 125, 149, 170, 170, 158, 133, 114, 
#> 140, 145, 150, 178, 163, 172, 178, 199, 199, 184, 162, 146, 166, 
#> 171, 180, 193, 181, 183, 218, 230, 242, 209, 191, 172, 194, 196, 
#> 196, 236, 235, 229, 243, 264, 272, 237, 211, 180, 201, 204, 188, 
#> 235, 227, 234, 264, 302, 293, 259, 229, 203, 229, 242, 233, 267, 
#> 269, 270, 315, 364, 347, 312, 274, 237, 278, 284, 277, 317, 313, 
#> 318, 374, 413, 405, 355, 306, 271, 306, 315, 301, 356, 348, 355, 
#> 422, 465, 467, 404, 347, 305, 336, 340, 318, 362, 348, 363, 435, 
#> 491, 505, 404, 359, 310, 337, 360, 342, 406, 396, 420, 472, 548, 
#> 559, 463, 407, 362, 405, 417, 391, 419, 461, 472, 535, 622, 606, 
#> 508, 461, 390, 432), .Tsp = c(1949, 1960.91666666667, 12), class = "ts")

5.4 其它数据格式

jsonlite 读取 *.json 格式的文件,jsonlite::write_json 函数将 R对象保存为 JSON 文件,jsonlite::fromJSON 将 json 字符串或文件转化为 R 对象,jsonlite::toJSON 函数正好与之相反

library(jsonlite)
jsonlite::read_json(path = "path/to/filename.json")

yaml 包读取 *.yml 格式文件,返回一个列表,yaml::write_yaml 函数将 R 对象写入 yaml 格式

library(yaml)
yaml::read_yaml(file = '_bookdown.yml')
#> $book_filename
#> [1] "RGraphics"
#> 
#> $delete_merged_file
#> [1] TRUE
#> 
#> $language
#> $language$label
#> $language$label$fig
#> [1] "图 "
#> 
#> $language$label$tab
#> [1] "表 "
#> 
#> 
#> $language$ui
#> $language$ui$edit
#> [1] "编辑"
#> 
#> $language$ui$chapter_name
#> [1] "第 " " 章"
#> 
#> 
#> 
#> $output_dir
#> [1] "_book"
#> 
#> $new_session
#> [1] TRUE
#> 
#> $before_chapter_script
#> [1] "_common.R"
表 5.2: 导入来自其它数据分析软件产生的数据集
统计软件 R函数 R包
ERSI ArcGIS read.shapefile shapefiles
Matlab readMat R.matlab
minitab read.mtp foreign
SAS (permanent data) read.ssd foreign
SAS (XPORT format) read.xport foreign
SPSS read.spss foreign
Stata read.dta foreign
Systat read.systat foreign
Octave read.octave foreign
表 5.3: 导入来自其它格式的数据集
文件格式 R函数 R包
列联表数据 read.ftable stats
二进制数据 readBin base
字符串数据 readChar base
剪贴板数据 readClipboard utils

read.dcf 函数读取 Debian 控制格式文件,这种类型的文件以人眼可读的形式在存储数据,如 R 包的 DESCRIPTION 文件或者包含所有 CRAN 上 R 包描述的文件 https://cran.r-project.org/src/contrib/PACKAGES

x <- read.dcf(file = system.file("DESCRIPTION", package = "splines"),
              fields = c("Package", "Version", "Title"))
x
#>      Package   Version Title                                    
#> [1,] "splines" "3.6.0" "Regression Spline Functions and Classes"

最后要提及拥有瑞士军刀之称的 rio 包,它集合了当前 R 可以读取的所有统计分析软件导出的数据。

5.5 批量导入数据

library(tidyverse)
#> Registered S3 methods overwritten by 'ggplot2':
#>   method         from 
#>   [.quosures     rlang
#>   c.quosures     rlang
#>   print.quosures rlang
read_list <- function(list_of_datasets, read_func) {
  read_and_assign <- function(dataset, read_func) {
    dataset_name <- as.name(dataset)
    dataset_name <- read_func(dataset)
  }

  # invisible is used to suppress the unneeded output
  output <- invisible(
    sapply(list_of_datasets,
      read_and_assign,
      read_func = read_func, simplify = FALSE, USE.NAMES = TRUE
    )
  )

  # Remove the extension at the end of the data set names
  names_of_datasets <- c(unlist(strsplit(list_of_datasets, "[.]"))[c(T, F)])
  names(output) <- names_of_datasets
  return(output)
}

批量导入文件扩展名为 .csv 的数据文件,即逗号分割的文件

data_files <- list.files(path = "path/to/csv/dir",pattern = ".csv", full.names = TRUE)
print(data_files)

相比于 Base R 提供的 read.csv 函数,使用 readr 包的 read_csv 函数可以更快地读取csv格式文件,特别是在读取GB级数据文件时,效果特别明显。

list_of_data_sets <- read_list(data_files, readr::read_csv)

使用 tibble 包的glimpse函数可以十分方便地对整个数据集有一个大致的了解,展示方式和信息量相当于 strhead 函数

tibble::glimpse(list_of_data_sets)

5.6 批量导出数据

假定我们有一个列表,其每个元素都是一个数据框,现在要把每个数据框分别存入 xlsx 表的工作薄中,以 mtcars 数据集为例,将其按分类变量 cyl 分组拆分,获得一个列表 list

dat <- split(mtcars, mtcars$cyl)
dat
#> $`4`
#>                 mpg cyl  disp  hp drat   wt qsec vs am gear carb
#> Datsun 710     22.8   4 108.0  93 3.85 2.32 18.6  1  1    4    1
#> Merc 240D      24.4   4 146.7  62 3.69 3.19 20.0  1  0    4    2
#> Merc 230       22.8   4 140.8  95 3.92 3.15 22.9  1  0    4    2
#> Fiat 128       32.4   4  78.7  66 4.08 2.20 19.5  1  1    4    1
#> Honda Civic    30.4   4  75.7  52 4.93 1.61 18.5  1  1    4    2
#> Toyota Corolla 33.9   4  71.1  65 4.22 1.83 19.9  1  1    4    1
#> Toyota Corona  21.5   4 120.1  97 3.70 2.46 20.0  1  0    3    1
#> Fiat X1-9      27.3   4  79.0  66 4.08 1.94 18.9  1  1    4    1
#> Porsche 914-2  26.0   4 120.3  91 4.43 2.14 16.7  0  1    5    2
#> Lotus Europa   30.4   4  95.1 113 3.77 1.51 16.9  1  1    5    2
#> Volvo 142E     21.4   4 121.0 109 4.11 2.78 18.6  1  1    4    2
#> 
#> $`6`
#>                 mpg cyl disp  hp drat   wt qsec vs am gear carb
#> Mazda RX4      21.0   6  160 110 3.90 2.62 16.5  0  1    4    4
#> Mazda RX4 Wag  21.0   6  160 110 3.90 2.88 17.0  0  1    4    4
#> Hornet 4 Drive 21.4   6  258 110 3.08 3.21 19.4  1  0    3    1
#> Valiant        18.1   6  225 105 2.76 3.46 20.2  1  0    3    1
#> Merc 280       19.2   6  168 123 3.92 3.44 18.3  1  0    4    4
#> Merc 280C      17.8   6  168 123 3.92 3.44 18.9  1  0    4    4
#> Ferrari Dino   19.7   6  145 175 3.62 2.77 15.5  0  1    5    6
#> 
#> $`8`
#>                      mpg cyl disp  hp drat   wt qsec vs am gear carb
#> Hornet Sportabout   18.7   8  360 175 3.15 3.44 17.0  0  0    3    2
#> Duster 360          14.3   8  360 245 3.21 3.57 15.8  0  0    3    4
#> Merc 450SE          16.4   8  276 180 3.07 4.07 17.4  0  0    3    3
#> Merc 450SL          17.3   8  276 180 3.07 3.73 17.6  0  0    3    3
#> Merc 450SLC         15.2   8  276 180 3.07 3.78 18.0  0  0    3    3
#> Cadillac Fleetwood  10.4   8  472 205 2.93 5.25 18.0  0  0    3    4
#> Lincoln Continental 10.4   8  460 215 3.00 5.42 17.8  0  0    3    4
#> Chrysler Imperial   14.7   8  440 230 3.23 5.34 17.4  0  0    3    4
#> Dodge Challenger    15.5   8  318 150 2.76 3.52 16.9  0  0    3    2
#> AMC Javelin         15.2   8  304 150 3.15 3.44 17.3  0  0    3    2
#> Camaro Z28          13.3   8  350 245 3.73 3.84 15.4  0  0    3    4
#> Pontiac Firebird    19.2   8  400 175 3.08 3.85 17.1  0  0    3    2
#> Ford Pantera L      15.8   8  351 264 4.22 3.17 14.5  0  1    5    4
#> Maserati Bora       15.0   8  301 335 3.54 3.57 14.6  0  1    5    8

将 xlsx 表格初始化,创建空白的工作薄

## 加载 openxlsx 包
library('openxlsx')
## 创建空白的工作薄
wb <- createWorkbook()

将列表里的每张表分别存入 xlsx 表格的每个 worksheet,worksheet 的名字就是分组变量的名字

Map(function(data, name){
    addWorksheet(wb, name)
    writeData(wb, name, data)
 
}, dat, names(dat))
#> $`4`
#> [1] 0
#> 
#> $`6`
#> [1] 0
#> 
#> $`8`
#> [1] 0

最后保存数据到磁盘,见图 5.1

saveWorkbook(wb, file = "data/matcars.xlsx", overwrite = TRUE)
#> Note: zip::zip() is deprecated, please use zip::zipr() instead
knitr::include_graphics(path = 'figures/batch-export-xlsx.png')
批量导出数据

图 5.1: 批量导出数据

5.7 导入大数据集

在不使用数据库的情况下,从命令行导入大数据集,如几百 M 或几个 G 的 csv 文件

https://stackoverflow.com/questions/1727772/

5.8 从数据库导入

Hands-On Programming with R 数据读写章节16 以及 R, Databases, and Docker

将大量的 txt 文本存进 MySQL 数据库中,通过操作数据库来聚合文本,极大降低内存消耗17,而 ODBC 与 DBI 包是其它数据库接口的基础,knitr 提供了一个支持 SQL 代码的引擎,它便是基于 DBI,因此可以在 R Markdown 文档中直接使用 SQL 代码块18。这里制作一个归纳表格,左边数据库右边对应其 R 接口,两边都包含链接,如表 5.4 所示

表 5.4: 数据库接口
数据库 官网 R接口 开发仓
MySQL https://www.mysql.com/ RMySQL https://github.com/r-dbi/RMySQL
SQLite https://www.sqlite.org RSQLite https://github.com/r-dbi/RSQLite
PostgreSQL https://www.postgresql.org/ RPostgres https://github.com/r-dbi/RPostgres
MariaDB https://mariadb.org/ RMariaDB https://github.com/r-dbi/RMariaDB

5.8.1 RSQLite

db <- DBI::dbConnect(RSQLite::SQLite(), ":memory:")
DBI::dbWriteTable(db, "mtcars", mtcars, overwrite=TRUE)
subjects = 6

max.print=10 控制显示的行数

SELECT * FROM mtcars where gear IN (3, 4) and cyl >= ?subjects
表 5.5: My Caption
mpg cyl disp hp drat wt qsec vs am gear carb
21.0 6 160 110 3.90 2.62 16.5 0 1 4 4
21.0 6 160 110 3.90 2.88 17.0 0 1 4 4
21.4 6 258 110 3.08 3.21 19.4 1 0 3 1
18.7 8 360 175 3.15 3.44 17.0 0 0 3 2
18.1 6 225 105 2.76 3.46 20.2 1 0 3 1
14.3 8 360 245 3.21 3.57 15.8 0 0 3 4
19.2 6 168 123 3.92 3.44 18.3 1 0 4 4
17.8 6 168 123 3.92 3.44 18.9 1 0 4 4
16.4 8 276 180 3.07 4.07 17.4 0 0 3 3
17.3 8 276 180 3.07 3.73 17.6 0 0 3 3

output.var="mtcars34" 将查询的结果保存到变量 mtcars34,此时不再输出打印到控制台,此外,还支持 SQL 语句中包含变量 subjects

SELECT * FROM mtcars where gear IN (3, 4) and cyl >= ?subjects

我们再来查看 mtcars34 变量的内容

head(mtcars34)
#>    mpg cyl disp  hp drat   wt qsec vs am gear carb
#> 1 21.0   6  160 110 3.90 2.62 16.5  0  1    4    4
#> 2 21.0   6  160 110 3.90 2.88 17.0  0  1    4    4
#> 3 21.4   6  258 110 3.08 3.21 19.4  1  0    3    1
#> 4 18.7   8  360 175 3.15 3.44 17.0  0  0    3    2
#> 5 18.1   6  225 105 2.76 3.46 20.2  1  0    3    1
#> 6 14.3   8  360 245 3.21 3.57 15.8  0  0    3    4

5.8.2 PostgreSQL

odbc 可以支持很多数据库,下面以连接 PostgreSQL 数据库为例介绍其过程

首先在某台机器上,拉取 PostgreSQL 的 Docker 镜像

docker pull postgres

在 Docker 上运行 PostgreSQL,主机端口号 8181 映射给数据库 PostgreSQL 的默认端口号 5432(或其它你的 DBA 分配给你的端口)

docker run --name psql -d -p 8181:5432 -e ROOT=TRUE \
   -e USER=xiangyun -e PASSWORD=cloud postgres

在主机 Ubuntu 上配置

sudo apt-get install unixodbc unixodbc-dev odbc-postgresql

端口 5432 是分配给 PostgreSQL 的默认端口,host 可以是云端的地址,如 你的亚马逊账户下的 PostgreSQL 数据库地址 <ec2-54-83-201-96.compute-1.amazonaws.com>,也可以是本地局域网IP地址,如<192.168.1.200>。通过参数 dbname 连接到指定的 PostgreSQL 数据库,如 Heroku,这里作为演示就以默认的数据库 postgres 为例

library(DBI)
con <- dbConnect(RPostgres::Postgres(),
  dbname = "DATABASE_NAME",
  host = "HOST",
  port = 8181,
  user = "USERNAME",
  password = "PASSWORD"
)

列出数据库中的所有表

dbListTables(con)
#> [1] "mtcars"

第一次启动从 Docker Hub 上下载的镜像,默认的数据库是 postgres 里面没有任何表,所以将 R 环境中的 mtcars 数据集写入 postgres 数据库

DBI::dbWriteTable(con, "mtcars", mtcars, overwrite=TRUE)

现在可以看到数据表 mtcars 的各个字段

dbListFields(con, "mtcars")
#>  [1] "mpg"  "cyl"  "disp" "hp"   "drat" "wt"   "qsec" "vs"   "am"   "gear"
#> [11] "carb"

最后执行一条 SQL 语句

res <- dbSendQuery(con, "SELECT * FROM mtcars WHERE cyl = 4") # 发送 SQL 语句
dbFetch(res) # 获取查询结果
#>     mpg cyl  disp  hp drat   wt qsec vs am gear carb
#> 1  22.8   4 108.0  93 3.85 2.32 18.6  1  1    4    1
#> 2  24.4   4 146.7  62 3.69 3.19 20.0  1  0    4    2
#> 3  22.8   4 140.8  95 3.92 3.15 22.9  1  0    4    2
#> 4  32.4   4  78.7  66 4.08 2.20 19.5  1  1    4    1
#> 5  30.4   4  75.7  52 4.93 1.61 18.5  1  1    4    2
#> 6  33.9   4  71.1  65 4.22 1.83 19.9  1  1    4    1
#> 7  21.5   4 120.1  97 3.70 2.46 20.0  1  0    3    1
#> 8  27.3   4  79.0  66 4.08 1.94 18.9  1  1    4    1
#> 9  26.0   4 120.3  91 4.43 2.14 16.7  0  1    5    2
#> 10 30.4   4  95.1 113 3.77 1.51 16.9  1  1    5    2
#> 11 21.4   4 121.0 109 4.11 2.78 18.6  1  1    4    2
dbClearResult(res) # 清理查询通道

或者一条命令搞定

DBI::dbGetQuery(con, "SELECT * FROM mtcars WHERE cyl = 4")
#>     mpg cyl  disp  hp drat   wt qsec vs am gear carb
#> 1  22.8   4 108.0  93 3.85 2.32 18.6  1  1    4    1
#> 2  24.4   4 146.7  62 3.69 3.19 20.0  1  0    4    2
#> 3  22.8   4 140.8  95 3.92 3.15 22.9  1  0    4    2
#> 4  32.4   4  78.7  66 4.08 2.20 19.5  1  1    4    1
#> 5  30.4   4  75.7  52 4.93 1.61 18.5  1  1    4    2
#> 6  33.9   4  71.1  65 4.22 1.83 19.9  1  1    4    1
#> 7  21.5   4 120.1  97 3.70 2.46 20.0  1  0    3    1
#> 8  27.3   4  79.0  66 4.08 1.94 18.9  1  1    4    1
#> 9  26.0   4 120.3  91 4.43 2.14 16.7  0  1    5    2
#> 10 30.4   4  95.1 113 3.77 1.51 16.9  1  1    5    2
#> 11 21.4   4 121.0 109 4.11 2.78 18.6  1  1    4    2

最后用完数据库查询要记得关闭连接

dbDisconnect(conn = con)

5.8.3 ClickHouse

对系统的要求是 System requirements: Linux, x86_64 with SSE 4.2.

sudo apt-get install dirmngr    # optional
# 导入 key
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4    # optional
# 添加源
echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
# 安装客户端
sudo apt-get install -y clickhouse-server clickhouse-client
# 启动服务
sudo service clickhouse-server start
# 进入客户端
clickhouse-client

ClickHouse 中文用户手册 和新浪高鹏在北京 ClickHouse社区分享会 上给的报告 MySQL DBA解锁数据分析的新姿势-ClickHouse

开源社区对 ClickHouse 提供了很多语言的支持,比如 R 语言接口有 RClickhouseclickhouse-r,其它接口请看 官方文档链接

由于 clickhouse-r 还在开发中,从未提交到 CRAN,提供的功能也相对有限,这里我们推荐使用 RClickhouse,首先安装 RClickhouse

# 安装 CRAN 版本
install.packages("RClickhouse")
# 安装Github上的开发版
devtools::install_github("IMSMWU/RClickhouse")

建立数据库的连接,离不开 dbConnect 函数,它提供的 config_paths 参数用来指定配置文件 RClickhouse.yaml 的路径,在该文件中指定一系列的参数值 host, port, db, user, password, compression

host: example-db.com
port: 1111

如果没有手动配置,会使用默认的参数配置 host="localhost", port = 9000, db = "default", user = "default", password = "", compression = "lz4"。现在,我们考虑更加实用的场景,服务器上安装了 Docker,ClickHouse 部署在Docker集群上,下面介绍如何从本机连接上集群。

首先在某台机器上,拉取 ClickHouse 的 Docker 镜像

docker pull yandex/clickhouse-server

从 Yandex 官网打包的 Dockerfile 来看,其默认暴露了 9000、 8123 和 9009 三个端口,所以我在运行 ClickHouse 容器的时候,需要选择其中一个端口映射给主机端口,这里我选择 8282,值得注意的是,我主机的8787端口已经分配给了 RStudio,所以不要再把该主机端口映射给 9000

docker run --name ck -d -p 8282:9000 -e ROOT=TRUE \
   -e USER=xiangyun -e PASSWORD=cloud yandex/clickhouse-server

最后是建立连接,需要远端 Docker 容器内 Clickhouse 的 IP 地址,分配给它的主机端口,用来访问数据库,数据库的用户账户和具体的数据库名称,默认账户和密码以及数据库都是 default,下面展示连接远程 ClickHouse 数据库

library(DBI)
library(RClickhouse)
con <- DBI::dbConnect(RClickhouse::clickhouse(),
  host = "192.168.99.100", port = 8282, db = "default",
  user = "default", password = "default", compression = "lz4"
)

下面是连接本地 ClickHouse 数据库,数据库和数据分析环境都在本地,我们使用默认的9000端口和默认default数据库,而默认的账户名在安装 ClickHouse 时指定为 cloud,因此连接参数设置如下

library(DBI)
library(RClickhouse)
# con <- DBI::dbConnect(RClickhouse::clickhouse(), config_paths = "~/.R/RClickhouse.yaml")
con <- DBI::dbConnect(RClickhouse::clickhouse(),
  host = "localhost", port = 9000, db = "default",
  user = "default", password = "cloud", compression = "lz4"
)

RClickHouse 包提供部分 dplyr 式的数据操作,使用比较方便,这里便使用它了,往 ClickHouse 中写入数据

DBI::dbWriteTable(con, "mtcars", mtcars, overwrite=TRUE)

查看数据库中的 mtcars 表

# 列出 ClickHouse 中存放的表
dbListTables(con)
#> [1] "mtcars"
# 列出表 mtcars 中的所有字段
dbListFields(con, "mtcars")
#>  [1] "mpg"       "cyl"       "disp"      "hp"        "drat"      "wt"       
#>  [7] "qsec"      "vs"        "am"        "gear"      "carb"      "row_names"

RClickHouse 包支持的聚合操作(其实是 dplyr)和 Base R 提供的聚合操作对比,测试一下正确与否

library(tidyverse)
# 按变量 cyl 分组对 mpg 求和
tbl(con, "mtcars") %>% 
  group_by(cyl) %>% 
  summarise(smpg=sum(mpg, na.rm = TRUE)) # SQL 总是要移除缺失值
#> Warning: Translator is missing window variants of the following aggregate functions:
#> * %||%
#> Warning in evalq((function (..., call. = TRUE, immediate. = FALSE, noBreaks. =
#> FALSE, : column uptime converted from UInt32 to Numeric
#> # Source:   lazy query [?? x 2]
#> # Database: clickhouse 19.5.2.6 [default@192.168.99.100:8282/default; uptime:
#> #   0 days ]
#>     cyl  smpg
#>   <dbl> <dbl>
#> 1     4  293.
#> 2     8  211.
#> 3     6  138.
# 等价于
aggregate(mpg ~ cyl, data = mtcars, sum)
#>   cyl mpg
#> 1   4 293
#> 2   6 138
#> 3   8 211

# 先筛选出 cyl = 8 并且 vs = 0 的数据,然后按 am 分组,最后对 qsec 求平均值
tbl(con, "mtcars") %>% 
  filter(cyl == 8, vs == 0) %>% 
  group_by(am) %>% 
  summarise(mean(qsec, na.rm = TRUE))
#> Warning: Translator is missing window variants of the following aggregate functions:
#> * %||%
#> Warning: Translator is missing window variants of the following aggregate functions:
#> * %||%

#> Warning: Translator is missing window variants of the following aggregate functions:
#> * %||%
#> Warning in evalq((function (..., call. = TRUE, immediate. = FALSE, noBreaks. =
#> FALSE, : column uptime converted from UInt32 to Numeric
#> # Source:   lazy query [?? x 2]
#> # Database: clickhouse 19.5.2.6 [default@192.168.99.100:8282/default; uptime:
#> #   0 days ]
#>      am `mean(qsec, na.rm = TRUE)`
#>   <dbl>                      <dbl>
#> 1     0                       17.1
#> 2     1                       14.6
# 等价于
aggregate(qsec ~ am, data = mtcars, mean, subset = cyl == 8 & vs == 0)
#>   am qsec
#> 1  0 17.1
#> 2  1 14.6

你当然可以继续使用 SQL 语句做查询,而不使用 dplyr 提供的现代化的管道操作语法

# 传递 SQL 查询语句
DBI::dbGetQuery(con, 
"SELECT
    vs,
    COUNT(*) AS n_vc,
    AVG(qsec) AS avg_qsec
FROM mtcars
GROUP BY vs")
#> Warning in evalq((function (..., call. = TRUE, immediate. = FALSE, noBreaks. =
#> FALSE, : column n_vc converted from UInt64 to Numeric
#>   vs n_vc avg_qsec
#> 1  0   18     16.7
#> 2  1   14     19.3

还有 RClickhouse 使用 SQL 查询的时候,同样支持 ClickHouse 的内置函数,如 multiIf

# 查看 ClickHouse 中所有的数据库名称
DBI::dbGetQuery(con, "SHOW DATABASES")
#>      name
#> 1 default
#> 2  system
# 查看所有存储的表
DBI::dbGetQuery(con, "SHOW TABLES")
#>     name
#> 1 mtcars
# 获取 ClickHouse 中 mtcars 表的变量名和类型描述
DBI::dbGetQuery(con, "DESCRIBE TABLE mtcars")
#>         name    type default_type default_expression comment codec_expression
#> 1        mpg Float64                                                         
#> 2        cyl Float64                                                         
#> 3       disp Float64                                                         
#> 4         hp Float64                                                         
#> 5       drat Float64                                                         
#> 6         wt Float64                                                         
#> 7       qsec Float64                                                         
#> 8         vs Float64                                                         
#> 9         am Float64                                                         
#> 10      gear Float64                                                         
#> 11      carb Float64                                                         
#> 12 row_names  String
# Compact CASE - WHEN - THEN conditionals
DBI::dbGetQuery(con, "
SELECT multiIf(am=1, 'automatic', 'manual') AS transmission,
       multiIf(vs=1, 'straight', 'V-shaped') AS engine
FROM mtcars
")
#>    transmission   engine
#> 1     automatic V-shaped
#> 2     automatic V-shaped
#> 3     automatic straight
#> 4        manual straight
#> 5        manual V-shaped
#> 6        manual straight
#> 7        manual V-shaped
#> 8        manual straight
#> 9        manual straight
#> 10       manual straight
#> 11       manual straight
#> 12       manual V-shaped
#> 13       manual V-shaped
#> 14       manual V-shaped
#> 15       manual V-shaped
#> 16       manual V-shaped
#> 17       manual V-shaped
#> 18    automatic straight
#> 19    automatic straight
#> 20    automatic straight
#> 21       manual straight
#> 22       manual V-shaped
#> 23       manual V-shaped
#> 24       manual V-shaped
#> 25       manual V-shaped
#> 26    automatic straight
#> 27    automatic V-shaped
#> 28    automatic straight
#> 29    automatic V-shaped
#> 30    automatic V-shaped
#> 31    automatic V-shaped
#> 32    automatic straight
dbDisconnect(conn = con)

这是一个存放在 Github 上的包,随着 ClikHouse 在大厂的流行,此包也受到越来越多的关注 与数据仓库如何连接,如何查询数据,背后的接口 DBI 如何使用,实例化一个新的接口,如 clickhouse2r

ClickHouse 独辟蹊径,基于 C++ 的实现,数据查询速度超级快,官网介绍碾压大量传统数据库。还有不少接口,其中还有 R 的 clickhouse-r

# 安装clickhouse接口
devtools::install_github("hannesmuehleisen/clickhouse-r")
# 调用接口
library(DBI)
con <- dbConnect(clickhouse::clickhouse(),
  host = "localhost",
  port = 8123L,
  user = "default",
  password = ""
)
dbWriteTable(con, "mtcars", mtcars)
dbListTables(con)
dbGetQuery(con, "SELECT COUNT(*) FROM mtcars")
d <- dbReadTable(con, "mtcars")
dbDisconnect(con)

发现它和 knitr 里的 SQL 钩子,都用 DBI包

knitr::include_graphics(path = "figures/clickhouse.png")
ClickHouse 与 R

图 5.2: ClickHouse 与 R

学习博文 利用 JDBC 驱动连接 R 和 Hiveclickhouse-jdbc 试试 [RJDBC] 包远程连接 ClickHouse 仓库

5.8.4 MySQL

MySQL 是一个很常见,应用也很广泛的数据库,数据分析的常见环境是在一个R Notebook 里,我们可以在正文之前先设定数据库连接信息

```{r setup}
library(DBI)
# 指定数据库连接信息
db <- dbConnect(RMySQL::MySQL(),
  dbname = 'dbtest',
  username = 'user_test',
  password = 'password',
  host = '10.10.101.10',
  port = 3306
)
# 创建默认连接
knitr::opts_chunk$set(connection = 'db')
# 设置字符编码,以免中文查询乱码
DBI::dbSendQuery(db, 'SET NAMES utf8')
# 设置日期变量,以运用在SQL中
idate <- '2019-05-03'
```

SQL 代码块中使用 R 环境中的变量,并将查询结果输出为R环境中的数据框

```{sql, output.var='data_output'}
SELECT * FROM user_table where date_format(created_date,'%Y-%m-%d')>=?idate
```

以上代码会将 SQL 的运行结果存在 data_output 这是数据库中,idate 取之前设置的日期2019-05-03user_table 是 MySQL 数据库中的表名,created_date 是创建user_table时,指定的日期名。

如果 SQL 比较长,为了代码美观,把带有变量的 SQL 保存为demo.sql脚本,只需要在 SQL 的 chunk 中直接读取 SQL 文件19

```{sql, code=readLines('demo.sql'), output.var='data_output'}
```

如果我们需要每天或者按照指定的日期重复地运行这个 R Markdown 文件,可以在 YAML 部分引入参数20

---
params:
  date: "2019-05-03"  # 参数化日期
---
```{r setup, include=FALSE}
idate = params$date # 将参数化日期传递给 idate 变量
```

我们将这个 Rmd 文件命名为 MyDocument.Rmd,运行这个文件可以从 R 控制台执行或在 RStudio 点击 knit。

rmarkdown::render("MyDocument.Rmd", params = list(
  date = "2019-05-03"
))

如果在文档的 YAML 位置已经指定日期,这里可以不指定。注意在这里设置日期会覆盖 YAML 处指定的参数值,这样做的好处是可以批量化操作。

5.8.5 Spark

当数据分析报告遇上 Spark 时,就需要 SparkRsparklyrarrowrsparking 接口了, Javier Luraschi 写了一本书 The R in Spark: Learning Apache Spark with R 详细介绍了相关扩展和应用

首先安装 sparklyr 包,RStudio 公司 Javier Lurasch 开发了 sparklyr 包,作为 Spark 与 R 语言之间的接口,安装完 sparklyr 包,还是需要 Spark 和 Hadoop 环境

install.packages('sparklyr')
library(sparklyr)
spark_install()
# Installing Spark 2.4.0 for Hadoop 2.7 or later.
# Downloading from:
# - 'https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz'
# Installing to:
# - '~/spark/spark-2.4.0-bin-hadoop2.7'
# trying URL 'https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz'
# Content type 'application/x-gzip' length 227893062 bytes (217.3 MB)
# ==================================================
# downloaded 217.3 MB
# 
# Installation complete.

既然 sparklyr 已经安装了 Spark 和 Hadoop 环境,安装 SparkR 后,只需配置好路径,就可以加载 SparkR 包

install.packages('SparkR')
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "~/spark/spark-2.4.0-bin-hadoop2.7")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

rscala 架起了 R 和 Scala 两门语言之间交流的桥梁,使得彼此之间可以互相调用

是否存在这样的可能, Spark 提供了大量的 MLib 库的调用接口,R 的功能支持是最少的,Java/Scala 是原生的,那么要么自己开发新的功能整合到 SparkR 中,要么借助 rscala 将 scala 接口代码封装进来