10 Extração, Transformação e Carga para integração de dados administrativos com R e linguagem estruturada de consulta (SQL).

10.1 Exemplo mínimo

dbc=read.dbc("/media/ferre/ferre128/Downloads/dbc/PAMS1808.dbc")
# coloca o nome das colunas em minusculo
colnames(dbc)=tolower(colnames(dbc))
# filtra apenas os medicamentos (SIGTAP 06)
dbc=subset(dbc, substr(pa_proc_id,1,2) == '06')


library(RPostgreSQL)

pgcon <- DBI::dbConnect(dbDriver(drvName = "PostgreSQL"),
                        dbname="labxss",
                        host="localhost", 
                        port=5432,
                        user = 'ferre',
                        password = "9015")

# tb=dbGetQuery(pgcon, query)

dbWriteTable(
  conn = pgcon, 
  c("tmp","ammg1805"), 
  dbc[ ,c('ap_autoriz', 'ap_cnspcn', 'ap_cmp' , 'ap_pripal', 'ap_vl_ap', 'ap_cidpri', 'ap_cidsec', 'ap_gestao', 'ap_coduni')], 
  row.names=FALSE, 
  #  append=TRUE,
  overwrite = TRUE
)

10.2 Fluxo para extração de dados de medicamentos do FTP do DATASUS

O código abaixo é reduzido à tabela SIA AM (medicamentos) do estado do Espírito Santo.

Respostas.

Respostas.

Respostas.

Respostas.

# install.packages("RCurl") 
# install.packages("htmlTable")
library("RCurl") 
library("htmlTable")


# variaveis   

  # url de base
  url="ftp://ftp.datasus.gov.br/dissemin/publicos/"

  # diretorio local para arquivos dbc
  dirdbc="~/Dropbox/cursoHAOC/SIAAM_ES/"
  

# lista de url complementar
lst=as.data.frame(matrix(c("SIA", "SIASUS/200801_/Dados/"), byrow = TRUE, ncol = 2))
colnames(lst)=c("no_sistema","ds_url")

listaftp <- read.table(text=getURL(ds_url), header = FALSE)
colnames(listaftp)=c("dt_atualizado_ftp","hs_atualizado_ftp","qt_bytes_ftp","no_dbc")

# nome do arquivo sem extensao e minusculo
  listaftp$no_arquivo=tolower(gsub("([0-9]+).*$", "\\1", listaftp$no_dbc))
  
  # aloca o nome do sistema
  listaftp$no_sistema=no_sistema
  
  # obtem a sigla da tabela (subsistema)
  listaftp$no_tabela=sub("^([[:alpha:]]*).*", "\\1", listaftp$no_dbc)
  listaftp$no_tabela=toupper(substr(listaftp$no_tabela,1,nchar(listaftp$no_tabela)-2))

  # sigla do local (uf ou brasil)
  listaftp$sg_local=sub("^([[:alpha:]]*).*", "\\1", listaftp$no_dbc)
  listaftp$sg_local=toupper(substr(listaftp$sg_local,nchar(listaftp$sg_local)-1,nchar(listaftp$sg_local)))
    
  # obtem a competencia (aaaamm)
  listaftp$nu_competencia=regmatches(listaftp$no_dbc, gregexpr("[[:digit:]]+", listaftp$no_dbc))
  listaftp$nu_competencia=paste(ifelse(substr(listaftp$nu_competencia,1,2) < 80, 20, 19),listaftp$nu_competencia, sep="")
  
  # aloca a url
  listaftp$ds_url=ds_url
  
  lista=rbind(lista,listaftp)

# obtem apenas tabela AM, do Espirito Santo a partir de dezembro de 2018
listaftp=subset(listaftp, no_tabela=='AM' & sg_local=='ES' & nu_competencia > 201811)


for (i in  1:dim(listaftp)[1]) {
  download.file(
    paste(listaftp$ds_url[i], listaftp$no_dbc[i], sep=""), 
    destfile = paste(dirdbc, listaftp$no_dbc[i], sep=""), 
    method = "wget", 
    extra = "-r -p --random-wait"
  )  
}  

10.3 Carga no PostgreSQL de arquivos DBC (SIA AM)

library(read.dbc) # arquivos dbc
library(RPostgreSQL) # conexão com o Sistema Gerenciador de Banco de Dados
library(gsubfn) # busca e substituicao

pgcon <- DBI::dbConnect(dbDriver(drvName = "PostgreSQL"),
                        dbname="labxss",
                        host="localhost", 
                        port=5432,
                        user = 'ferre',
                        password = "9015")


# Cria tabela para acomodar os atributos de interesse da tabela AM

# tabela temporaria de dispensação
query="
DROP TABLE IF EXISTS tmp.tf_dispensacao_am;
CREATE TABLE tmp.tf_dispensacao_am (
  AP_AUTORIZ int8 NULL,
  AP_CNSPCN int8 NULL,
  AP_CMP int4 NULL,
  AP_PRIPAL int8 NULL,
  AP_VL_AP decimal(10,2) NULL,
  AP_CIDPRI varchar(4) NULL,
  AP_CIDSEC varchar(4) NULL,
  AP_GESTAO int4 NULL,
  AP_CODUNI int4 NULL
);
"
dbGetQuery(pgcon, query) # executa a consulta no PGSQL

# tabela temporaria de usuarios do sus
query="
DROP TABLE IF EXISTS tmp.tm_usuariosus_am;
CREATE TABLE tmp.tm_usuariosus_am (
ap_cnspcn int8 NULL,
ap_sexo varchar(1),
ap_nuidade int2 NULL,
ap_cmp int4 NULL,
ap_obito int2 NULL,
am_peso int2 NULL,
am_altura int2 NULL
);
"
dbGetQuery(pgcon, query) # executa a consulta no PGSQL

# tabela temporaria para definir a residencia do usuario
query="
DROP TABLE IF EXISTS tmp.tf_usuariosus_residencia_am;
CREATE TABLE tmp.tf_usuariosus_residencia_am (
ap_cnspcn int8 NULL,
ap_munpcn int4 NULL,
ap_ceppcn int4 NULL
);
"
dbGetQuery(pgcon, query) # executa a consulta no PGSQL


# diretorio local onde foram baixados os arquivos dbc
dirdbc="~/Dropbox/cursoHAOC/SIAAM_ES/"

vector=list.files(dirdbc) # lista de arquivos locais

# para cada arquivo local
for (file in vector[grep("^AM", vector)]) {
  
# le o arquivo dbc
dbc=read.dbc(paste(dirdbc, file, sep = ""))
# nome das colunas em minusculo devido ao SGBD
colnames(dbc)=tolower(colnames(dbc))

# obtem apenas numeros
dbc$am_altura=as.numeric(gsub("([0-9]+).*$", "\\1", dbc$am_altura))
dbc$am_peso=as.numeric(gsub("([0-9]+).*$", "\\1", dbc$am_peso))

# converte CNS em caracteres numericos
dbc$ap_cnspcn=
    gsubfn(
      ".", 
      list(
        "{" = "0", "}" = "9", "~" = "8", 
        "\177" = "7", "Ç" = "6", "ä" = "5", 
        "ü" = "4", "é" = "3", "|" = "2", "â" = "1"
        ), 
      iconv(dbc$ap_cnspcn, "CP861", "UTF-8")
    )

# carga na tabela de usuario
dbWriteTable(
  conn = pgcon, 
  c("tmp","tm_usuariosus_am"), 
  dbc[ ,c('ap_cnspcn', 'ap_sexo' , 'ap_nuidade', 'ap_cmp', 'ap_obito', 'am_peso', 'am_altura')], 
  row.names=FALSE, 
  append=TRUE
)

# carga na tabela de residencia de usuario
dbWriteTable(
  conn = pgcon, 
  c("tmp","tf_usuariosus_residencia_am"), 
  dbc[ ,c('ap_cnspcn', 'ap_munpcn' , 'ap_ceppcn')], 
  row.names=FALSE, 
  append=TRUE
)

# carga na tabela de dispensacao
dbWriteTable(
  conn = pgcon, 
  c("tmp","tf_dispensacao_am"), 
  dbc[ ,c('ap_autoriz', 'ap_cnspcn', 'ap_cmp' , 'ap_pripal', 'ap_vl_ap', 'ap_cidpri', 'ap_cidsec', 'ap_gestao', 'ap_coduni')], 
  row.names=FALSE, 
  append=TRUE # , overwrite = TRUE
)

# utilizar caso esteja rodando no console
# print(file) 

# cria tabelas FATO

query="
drop table if exists tmp.tf_usuariosus_am;
create table tmp.tf_usuariosus_am as
select 
    ap_cnspcn ,
    'F' as sg_sexo,
    min(substr(ap_nuidade::text,1,4)::int-ap_nuidade) as nu_ano_nascimento,
    sum(ap_obito) as ap_obito,
    max(am_peso) as am_peso,
    max(am_altura) as am_altura
from TMP.tm_usuariosus_am
group by 1;

create table tmp.tm_sexopcn as
select ap_cnspcn,
       sum(case when ap_sexo = 'F' then 1 else 0 end) as qt_sexoF,
       sum(case when ap_sexo = 'M' then 1 else 0 end) as qt_sexoM
  from tmp.tm_usuariosus_am
 group by ap_cnspcn,
          ap_sexo;
         
update tmp.tf_usuariosus_am A
   set sg_sexo = 'M'
  from tmp.tm_sexopcn B
 where A.ap_cnspcn=B.ap_cnspcn
   and qt_sexoM > qt_sexoF;
         
drop table if exists tmp.tm_usuariosus_am, tmp.tm_sexopcn;
"
dbGetQuery(pgcon, query) # executa a consulta no PGSQL


}