我給它含有具有逗號分隔的數據文件名files
列表進行清潔,以及通過包含基於文件名信息欄進一步擴展。因此,我實現了一個小read_file
函數,它處理兩個,初始清潔,以及附加列的計算。使用db.from_sequence(files).map(read_file)
,我將讀取函數映射到所有文件,每個文件都獲取一個字典列表。DASK包成DASK數據幀的列
然而,而不是字典的名單,我想我的包包含輸入文件作爲一個入門的每個單獨的線。隨後,我想將字典的鍵映射到dask數據框中的列名。
from dask import bag as db
def read_file(filename):
ret = []
with open(filename, 'r') as fp:
... # reading line of file and storing result in dict
ret.append({'a': val_a, 'b': val_b, 'c': val_c})
return ret
from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
有人能讓我知道我必須改變以獲得此代碼的運行嗎?是否有不同的方法更適合?
編輯: 我已經創建了三個測試文件a_20160101.txt
,a_20160102.txt
,a_20160103.txt
。它們都只包含幾行,每行只有一個字符串。
asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads
f
以前我在read_file
了一個小錯誤,但現在,調用my_bag.take(10)
後映射到閱讀器工作正常:
([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)
然而my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
隨後 my_df.head(10)
仍然引起dask.async.AssertionError: 3 columns passed, passed data had 10 columns
有一袋字典,然後傳遞給to_dataframe有什麼問題? – MRocklin