2016-08-22 62 views
1

我給它含有具有逗號分隔的數據文件名files列表進行清潔,以及通過包含基於文件名信息欄進一步擴展。因此,我實現了一個小read_file函數,它處理兩個,初始清潔,以及附加列的計算。使用db.from_sequence(files).map(read_file),我將讀取函數映射到所有文件,每個文件都獲取一個字典列表。DASK包成DASK數據幀的列

然而,而不是字典的名單,我想我的包包含輸入文件作爲一個入門的每個單獨的線。隨後,我想將字典的鍵映射到dask數據框中的列名。

from dask import bag as db 

def read_file(filename): 
    ret = [] 
    with open(filename, 'r') as fp: 
     ... # reading line of file and storing result in dict 
     ret.append({'a': val_a, 'b': val_b, 'c': val_c}) 

    return ret 

from dask import bag as db 
files = ['a.txt', 'b.txt', 'c.txt'] 
my_bag = db.from_sequence(files).map(read_file) 
# a,b,c are the keys of the dictionaries returned by read_file 
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c']) 

有人能讓我知道我必須改變以獲得此代碼的運行嗎?是否有不同的方法更適合?

編輯: 我已經創建了三個測試文件a_20160101.txt,a_20160102.txt,a_20160103.txt。它們都只包含幾行,每行只有一個字符串。

asdf 
sadfsadf 
sadf 
fsadff 
asdf 
sadfasd 
fa 
sf 
ads 
f 

以前我在read_file了一個小錯誤,但現在,調用my_bag.take(10)後映射到閱讀器工作正常:

([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],) 

然而my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])隨後 my_df.head(10)仍然引起dask.async.AssertionError: 3 columns passed, passed data had 10 columns

+0

有一袋字典,然後傳遞給to_dataframe有什麼問題? – MRocklin

回答

0

你可能需要調用concat

您的文件名的包看起來是這樣的:

['a.txt', 
'b.txt', 
'c.txt'] 

在調用映射你的包是這樣的:

[[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}], 
[{'a': 1, 'b': 2, 'c': 3}], 
[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]] 

每個文件被闢爲類型的字典列表。現在你的包有點像一個列表的列表。

.to_dataframe方法,希望你有一個列表的-類型的字典。因此,讓我們的行李連接成一個單一的扁平化集合的書稿

my_bag = db.from_sequence(files).map(read_file).concat() 

[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}, 
{'a': 1, 'b': 2, 'c': 3}, 
{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}] 
+0

''concat()'它完美的工作,謝謝!使用不同的方法會更好嗎?還是變得更好?如果我沒記錯的[DASK教程(https://github.com/dask/dask-tutorial)正確,包裝袋上應罰款攝入/轉換,這是正確的? – sim

+0

這對我來說似乎是一個合理的方法。你也可以嘗試[dask.delayed](http://dask.readthedocs.io/en/latest/delayed.html)。請參閱[使用藏品]的註釋(http://dask.readthedocs.io/en/latest/delayed-collections.html) – MRocklin