2017-08-16 95 views
1

我有一個data.h5文件組織成多個塊,整個文件有幾百吉字節。我需要以Pandas DataFrame的形式在內存中處理文件的過濾子集。同時讀熊貓的一個HDF5文件

以下例程的目標是將篩選工作分佈到多個進程,然後將篩選結果連接到最終的DataFrame中。

由於從文件讀取需要大量的時間,我試圖讓每個進程都以併發的方式讀取自己的塊。

import multiprocessing as mp, pandas as pd 

store = pd.HDFStore('data.h5') 
min_dset, max_dset = 0, len(store.keys()) - 1 
dset_list = list(range(min_dset, max_dset)) 

frames = [] 

def read_and_return_subset(dset): 
    # each process is intended to read its own chunk in a concurrent manner 
    chunk = store.select('batch_{:03}'.format(dset)) 

    # and then process the chunk, do the filtering, and return the result 
    output = chunk[chunk.some_condition == True] 
    return output 


with mp.Pool(processes=32) as pool: 
    for frame in pool.map(read_and_return_subset, dset_list): 
     frames.append(frame) 

df = pd.concat(frames) 

然而,上面的代碼會觸發此錯誤:

HDF5ExtError        Traceback (most recent call last) 
<ipython-input-174-867671c5a58f> in <module>() 
    53 
    54  with mp.Pool(processes=32) as pool: 
---> 55   for frame in pool.map(read_and_return_subset, dset_list): 
    56    frames.append(frame) 
    57 

/usr/lib/python3.5/multiprocessing/pool.py in map(self, func, iterable, chunksize) 
    258   in a list that is returned. 
    259   ''' 
--> 260   return self._map_async(func, iterable, mapstar, chunksize).get() 
    261 
    262  def starmap(self, func, iterable, chunksize=None): 

/usr/lib/python3.5/multiprocessing/pool.py in get(self, timeout) 
    606    return self._value 
    607   else: 
--> 608    raise self._value 
    609 
    610  def _set(self, i, obj): 

HDF5ExtError: HDF5 error back trace 

    File "H5Dio.c", line 173, in H5Dread 
    can't read data 
    File "H5Dio.c", line 554, in H5D__read 
    can't read data 
    File "H5Dchunk.c", line 1856, in H5D__chunk_read 
    error looking up chunk address 
    File "H5Dchunk.c", line 2441, in H5D__chunk_lookup 
    can't query chunk address 
    File "H5Dbtree.c", line 998, in H5D__btree_idx_get_addr 
    can't get chunk info 
    File "H5B.c", line 340, in H5B_find 
    unable to load B-tree node 
    File "H5AC.c", line 1262, in H5AC_protect 
    H5C_protect() failed. 
    File "H5C.c", line 3574, in H5C_protect 
    can't load entry 
    File "H5C.c", line 7954, in H5C_load_entry 
    unable to load entry 
    File "H5Bcache.c", line 143, in H5B__load 
    wrong B-tree signature 

End of HDF5 error back trace 

Problems reading the array data. 

看來,熊貓/ pyTables有麻煩試圖訪問同一文件的併發方式時,即使只是閱讀。

有沒有辦法可以讓每個進程同時讀取自己的塊?

+0

我不確定你可以通過並行化來提高IO速度 - 你仍然需要從磁盤讀取完全相同的數據量以及「多處理」開銷。你打算怎麼處理你正在閱讀的大塊?你需要處理所有的數據還是隻處理其中的一部分? – MaxU

+0

@MaxU我在閱讀您的評論後深入挖掘,看起來大部分時間都是花在由h5庫處理文件上,而不是實際上從磁盤上流式傳輸文件。事實證明,將相同的文件放入基於RAM的磁盤中看不到速度的提升。因此,我認爲同時閱讀文件可能會加快整個過程。 – Jivan

回答

0

IIUC您可以索引那些用於過濾數據的列(chunk.some_condition == True - 在示例代碼中),然後只讀取滿足所需條件的數據子集。

爲了能夠做到這一點,你需要:

  1. 保存HDF5文件中table格式 - 使用參數:format='table'
  2. 索引列,將被用於過濾 - 使用參數:data_columns=['col_name1', 'col_name2', etc.]

之後,你應該能夠通過閱讀來過濾數據:

store = pd.HDFStore(filename) 
df = store.select('key_name', where="col1 in [11,13] & col2 == 'AAA'") 
+0

有趣。然而,測試表明,對以'table'格式保存的文件進行寫入/讀取比用'fixed'格式寫入/讀取要慢得多。 – Jivan

+0

@Jivan,寫作肯定會變慢(因爲開銷和附加索引)。使用'where'子句時,讀取通常會更快。如果你經常需要從磁盤讀取__everything__,那麼你可能希望保持固定的格式... – MaxU