10 Extração, Transformação e Carga para integração de dados administrativos com R e linguagem estruturada de consulta (SQL).
10.1 Exemplo mínimo
dbc=read.dbc("/media/ferre/ferre128/Downloads/dbc/PAMS1808.dbc")
# coloca o nome das colunas em minusculo
colnames(dbc)=tolower(colnames(dbc))
# filtra apenas os medicamentos (SIGTAP 06)
dbc=subset(dbc, substr(pa_proc_id,1,2) == '06')
library(RPostgreSQL)
pgcon <- DBI::dbConnect(dbDriver(drvName = "PostgreSQL"),
dbname="labxss",
host="localhost",
port=5432,
user = 'ferre',
password = "9015")
# tb=dbGetQuery(pgcon, query)
dbWriteTable(
conn = pgcon,
c("tmp","ammg1805"),
dbc[ ,c('ap_autoriz', 'ap_cnspcn', 'ap_cmp' , 'ap_pripal', 'ap_vl_ap', 'ap_cidpri', 'ap_cidsec', 'ap_gestao', 'ap_coduni')],
row.names=FALSE,
# append=TRUE,
overwrite = TRUE
)
10.2 Fluxo para extração de dados de medicamentos do FTP do DATASUS
O código abaixo é reduzido à tabela SIA AM (medicamentos) do estado do Espírito Santo.
# install.packages("RCurl")
# install.packages("htmlTable")
library("RCurl")
library("htmlTable")
# variaveis
# url de base
url="ftp://ftp.datasus.gov.br/dissemin/publicos/"
# diretorio local para arquivos dbc
dirdbc="~/Dropbox/cursoHAOC/SIAAM_ES/"
# lista de url complementar
lst=as.data.frame(matrix(c("SIA", "SIASUS/200801_/Dados/"), byrow = TRUE, ncol = 2))
colnames(lst)=c("no_sistema","ds_url")
listaftp <- read.table(text=getURL(ds_url), header = FALSE)
colnames(listaftp)=c("dt_atualizado_ftp","hs_atualizado_ftp","qt_bytes_ftp","no_dbc")
# nome do arquivo sem extensao e minusculo
listaftp$no_arquivo=tolower(gsub("([0-9]+).*$", "\\1", listaftp$no_dbc))
# aloca o nome do sistema
listaftp$no_sistema=no_sistema
# obtem a sigla da tabela (subsistema)
listaftp$no_tabela=sub("^([[:alpha:]]*).*", "\\1", listaftp$no_dbc)
listaftp$no_tabela=toupper(substr(listaftp$no_tabela,1,nchar(listaftp$no_tabela)-2))
# sigla do local (uf ou brasil)
listaftp$sg_local=sub("^([[:alpha:]]*).*", "\\1", listaftp$no_dbc)
listaftp$sg_local=toupper(substr(listaftp$sg_local,nchar(listaftp$sg_local)-1,nchar(listaftp$sg_local)))
# obtem a competencia (aaaamm)
listaftp$nu_competencia=regmatches(listaftp$no_dbc, gregexpr("[[:digit:]]+", listaftp$no_dbc))
listaftp$nu_competencia=paste(ifelse(substr(listaftp$nu_competencia,1,2) < 80, 20, 19),listaftp$nu_competencia, sep="")
# aloca a url
listaftp$ds_url=ds_url
lista=rbind(lista,listaftp)
# obtem apenas tabela AM, do Espirito Santo a partir de dezembro de 2018
listaftp=subset(listaftp, no_tabela=='AM' & sg_local=='ES' & nu_competencia > 201811)
for (i in 1:dim(listaftp)[1]) {
download.file(
paste(listaftp$ds_url[i], listaftp$no_dbc[i], sep=""),
destfile = paste(dirdbc, listaftp$no_dbc[i], sep=""),
method = "wget",
extra = "-r -p --random-wait"
)
}
10.3 Carga no PostgreSQL de arquivos DBC (SIA AM)
library(read.dbc) # arquivos dbc
library(RPostgreSQL) # conexão com o Sistema Gerenciador de Banco de Dados
library(gsubfn) # busca e substituicao
pgcon <- DBI::dbConnect(dbDriver(drvName = "PostgreSQL"),
dbname="labxss",
host="localhost",
port=5432,
user = 'ferre',
password = "9015")
# Cria tabela para acomodar os atributos de interesse da tabela AM
# tabela temporaria de dispensação
query="
DROP TABLE IF EXISTS tmp.tf_dispensacao_am;
CREATE TABLE tmp.tf_dispensacao_am (
AP_AUTORIZ int8 NULL,
AP_CNSPCN int8 NULL,
AP_CMP int4 NULL,
AP_PRIPAL int8 NULL,
AP_VL_AP decimal(10,2) NULL,
AP_CIDPRI varchar(4) NULL,
AP_CIDSEC varchar(4) NULL,
AP_GESTAO int4 NULL,
AP_CODUNI int4 NULL
);
"
dbGetQuery(pgcon, query) # executa a consulta no PGSQL
# tabela temporaria de usuarios do sus
query="
DROP TABLE IF EXISTS tmp.tm_usuariosus_am;
CREATE TABLE tmp.tm_usuariosus_am (
ap_cnspcn int8 NULL,
ap_sexo varchar(1),
ap_nuidade int2 NULL,
ap_cmp int4 NULL,
ap_obito int2 NULL,
am_peso int2 NULL,
am_altura int2 NULL
);
"
dbGetQuery(pgcon, query) # executa a consulta no PGSQL
# tabela temporaria para definir a residencia do usuario
query="
DROP TABLE IF EXISTS tmp.tf_usuariosus_residencia_am;
CREATE TABLE tmp.tf_usuariosus_residencia_am (
ap_cnspcn int8 NULL,
ap_munpcn int4 NULL,
ap_ceppcn int4 NULL
);
"
dbGetQuery(pgcon, query) # executa a consulta no PGSQL
# diretorio local onde foram baixados os arquivos dbc
dirdbc="~/Dropbox/cursoHAOC/SIAAM_ES/"
vector=list.files(dirdbc) # lista de arquivos locais
# para cada arquivo local
for (file in vector[grep("^AM", vector)]) {
# le o arquivo dbc
dbc=read.dbc(paste(dirdbc, file, sep = ""))
# nome das colunas em minusculo devido ao SGBD
colnames(dbc)=tolower(colnames(dbc))
# obtem apenas numeros
dbc$am_altura=as.numeric(gsub("([0-9]+).*$", "\\1", dbc$am_altura))
dbc$am_peso=as.numeric(gsub("([0-9]+).*$", "\\1", dbc$am_peso))
# converte CNS em caracteres numericos
dbc$ap_cnspcn=
gsubfn(
".",
list(
"{" = "0", "}" = "9", "~" = "8",
"\177" = "7", "Ç" = "6", "ä" = "5",
"ü" = "4", "é" = "3", "|" = "2", "â" = "1"
),
iconv(dbc$ap_cnspcn, "CP861", "UTF-8")
)
# carga na tabela de usuario
dbWriteTable(
conn = pgcon,
c("tmp","tm_usuariosus_am"),
dbc[ ,c('ap_cnspcn', 'ap_sexo' , 'ap_nuidade', 'ap_cmp', 'ap_obito', 'am_peso', 'am_altura')],
row.names=FALSE,
append=TRUE
)
# carga na tabela de residencia de usuario
dbWriteTable(
conn = pgcon,
c("tmp","tf_usuariosus_residencia_am"),
dbc[ ,c('ap_cnspcn', 'ap_munpcn' , 'ap_ceppcn')],
row.names=FALSE,
append=TRUE
)
# carga na tabela de dispensacao
dbWriteTable(
conn = pgcon,
c("tmp","tf_dispensacao_am"),
dbc[ ,c('ap_autoriz', 'ap_cnspcn', 'ap_cmp' , 'ap_pripal', 'ap_vl_ap', 'ap_cidpri', 'ap_cidsec', 'ap_gestao', 'ap_coduni')],
row.names=FALSE,
append=TRUE # , overwrite = TRUE
)
# utilizar caso esteja rodando no console
# print(file)
# cria tabelas FATO
query="
drop table if exists tmp.tf_usuariosus_am;
create table tmp.tf_usuariosus_am as
select
ap_cnspcn ,
'F' as sg_sexo,
min(substr(ap_nuidade::text,1,4)::int-ap_nuidade) as nu_ano_nascimento,
sum(ap_obito) as ap_obito,
max(am_peso) as am_peso,
max(am_altura) as am_altura
from TMP.tm_usuariosus_am
group by 1;
create table tmp.tm_sexopcn as
select ap_cnspcn,
sum(case when ap_sexo = 'F' then 1 else 0 end) as qt_sexoF,
sum(case when ap_sexo = 'M' then 1 else 0 end) as qt_sexoM
from tmp.tm_usuariosus_am
group by ap_cnspcn,
ap_sexo;
update tmp.tf_usuariosus_am A
set sg_sexo = 'M'
from tmp.tm_sexopcn B
where A.ap_cnspcn=B.ap_cnspcn
and qt_sexoM > qt_sexoF;
drop table if exists tmp.tm_usuariosus_am, tmp.tm_sexopcn;
"
dbGetQuery(pgcon, query) # executa a consulta no PGSQL
}