2017-02-27 105 views
3

我需要對非常大的csv文件(c.8.5GB)進行一些相對簡單的更改。我最初嘗試使用各種閱讀器函數:read.csv,readr :: read.csv,data.table :: fread。但是:它們全部用完內存。流處理R中的大csv文件

我想我需要使用流處理方法,而不是;讀一個塊,更新它,寫下來,重複。我發現this answer這是正確的;但我不知道如何終止循環(我對R來說比較新)。

所以我有2個問題:

  1. 什麼使while循環工作的正確方法?
  2. 有沒有更好的方法(對'更好'的一些定義)?例如有沒有辦法做到這一點使用dplyr &管道?

當前代碼如下:

src_fname <- "testdata/model_input.csv" 
tgt_fname <- "testdata/model_output.csv" 

#Changes needed in file: rebase identifiers, set another col to constant value 
rebase_data <- function(data, offset) { 
    data$'Unique Member ID' <- data$'Unique Member ID' - offset 
    data$'Client Name' <- "TestClient2" 
    return(data) 
} 

CHUNK_SIZE <- 1000 
src_conn = file(src_fname, "r") 
data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE) 
cols <- colnames(data) 
offset <- data$'Unique Member ID'[1] - 1 

data <- rebase_data(data, offset) 
#1st time through, write the headers 
tgt_conn = file(tgt_fname, "w") 
write.csv(data,tgt_conn, row.names=FALSE) 

#loop over remaining data 
end = FALSE 
while(end == FALSE) { 
    data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols) 
    data <- rebase_data(data, offset) 
    #write.csv doesn't support col.names=FALSE; so use write.table which does 
    write.table(data, tgt_conn, row.names=FALSE, col.names=FALSE, sep=",") 
    # ??? How to test for EOF and set end = TRUE if so ??? 
    # This doesn't work, presumably because nrow() != CHUNK_SIZE on final loop? 
    if (nrow(data) < CHUNK_SIZE) { 
    end <- TRUE 
    } 

} 
close(src_conn) 
close(tgt_conn) 

感謝任何指針。

+1

檢查出CRAN包'chunked'。它允許從文本文件中chunkwise讀取,特別有趣的是,使用dplyr進行chunkwise處理。沒有小插曲,但介紹https://github.com/edwindj/chunked/使用 我打算自己嘗試,但沒有找到時間! –

回答

0

OK我找到了解決方法,具體如下:

# src_fname <- "testdata/model_input.csv" 
# tgt_fname <- "testdata/model_output.csv" 

CHUNK_SIZE <- 20000 

#Changes needed in file: rebase identifiers, set another col to constant value 
rebase_data <- function(data, offset) { 
    data$'Unique Member ID' <- data$'Unique Member ID' - offset 
    data$'Client Name' <- "TestClient2" 
    return(data) 
} 

#-------------------------------------------------------- 
# Get the structure first to speed things up 
#-------------------------------------------------------- 
structure <- read.csv(src_fname, nrows = 2, check.names = FALSE) 
cols <- colnames(structure) 
offset <- structure$'Unique Member ID'[1] - 1 

#Open the input & output files for reading & writing 
src_conn = file(src_fname, "r") 
tgt_conn = file(tgt_fname, "w") 

lines_read <- 0 
end <- FALSE 
read_header <- TRUE 
write_header <- TRUE 
while(end == FALSE) { 
    data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols, header = read_header) 
    if (nrow(data) > 0) { 
    lines_read <- lines_read + nrow(data) 
    print(paste0("lines read this chunk: ", nrow(data), ", lines read so far: ", lines_read)) 
    data <- rebase_data(data, offset) 
    #write.csv doesn't support col.names=FALSE; so use write.table which does 
    write.table(data, tgt_conn, row.names=FALSE, col.names=write_header, sep = ",") 
    } 
    if (nrow(data) < CHUNK_SIZE) { 
    end <- TRUE 
    } 
    read_header <- FALSE 
    write_header <- FALSE 
} 
close(src_conn) 
close(tgt_conn) 
1

嘗試了這一點:

library("chunked") 

read_chunkwise(src_fname, chunk_size=CHUNK_SIZE) %>% 
rebase_data(offset) %>% 
write_chunkwise(tgt_fname) 

您可能需要擺弄了一下與colnames得到你想要的東西。

(免責聲明:沒有嘗試過的代碼)

注意,沒有暗角與方案,但標準用法是在GitHub上描述:https://github.com/edwindj/chunked/

+0

非常感謝 - 在我的谷歌搜索中沒有發現chunked。看起來就是這樣。 – sfinnie