2017-07-13 109 views
4

我正在尋找使用python從s3中讀取多個分區目錄中的數據的方法。如何使用Python中的pyarrow從S3中讀取分區的實木複合地址文件

data_folder/SERIAL_NUMBER = 1/cur_date = 20-12-2012/abcdsd0324324.snappy.parquet data_folder/SERIAL_NUMBER = 2/cur_date = 27-12-2012/asdsdfsd0324324.snappy.parquet

pyarrow的ParquetDataset模塊有能力從分區讀取。所以,我曾嘗試下面的代碼:

>>> import pandas as pd 
>>> import pyarrow.parquet as pq 
>>> import s3fs 
>>> a = "s3://my_bucker/path/to/data_folder/" 
>>> dataset = pq.ParquetDataset(a) 

它扔了以下錯誤:

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__ 
    self.metadata_path) = _make_manifest(path_or_paths, self.fs) 
    File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest 
    .format(path)) 
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/ 

基於pyarrow的文檔,我試着用s3fs作爲文件系統,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs) 

其中拋出以下錯誤:

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__ 
    self.metadata_path) = _make_manifest(path_or_paths, self.fs) 
    File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest 
    if is_string(path_or_paths) and fs.isdir(path_or_paths): 
AttributeError: module 's3fs' has no attribute 'isdir' 

我僅限於使用ECS羣集,因此spark/pyspark不是選項

有沒有一種方法,我們可以輕鬆地讀取parquet文件,在Python中從這樣的分區目錄在S3?我覺得列出所有的目錄,然後閱讀,這不是link建議的良好做法。我需要將讀取的數據轉換爲熊貓數據框以供進一步處理&,因此更喜歡與fastparquet或pyarrow相關的選項。我也接受python中的其他選項。

回答

4

我設法使用最新版本的fastparquet & s3fs。下面是相同的代碼:

import s3fs 
import fastparquet as fp 
s3 = s3fs.S3FileSystem() 
fs = s3fs.core.S3FileSystem() 

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
s3_path = "mybucket/data_folder/*/*/*.parquet" 
all_paths_from_s3 = fs.glob(path=s3_path) 

myopen = s3.open 
#use s3fs as the filesystem 
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen) 
#convert to pandas dataframe 
df = fp_obj.to_pandas() 

學分馬丁通過在正確的方向指向我,我們conversation

NB:這將是比使用pyarrow慢的基礎上,benchmark。我會更新我的答案一旦s3fs支持在pyarrow通過ARROW-1213

我做了快速基準上使用文件的pyarrow &列表蹦牀網上單人反覆發送的水珠來fastparquet實施。 fastparquet速度更快,s3fs vs pyarrow +我的黑客代碼。但我認爲pyarrow + s3fs在實施後會更快。

代碼&基準是下面:

>>> def test_pq(): 
...  for current_file in list_parquet_files: 
...   f = fs.open(current_file) 
...   df = pq.read_table(f).to_pandas() 
...   # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe 
...   #probably not the best way to split :) 
...   elements_list=current_file.split('/') 
...   for item in elements_list: 
...    if item.find(date_partition) != -1: 
...     current_date = item.split('=')[1] 
...    elif item.find(dma_partition) != -1: 
...     current_dma = item.split('=')[1] 
...   df['serial_number'] = current_dma 
...   df['cur_date'] = current_date 
...   list_.append(df) 
...  frame = pd.concat(list_) 
... 
>>> timeit.timeit('test_pq()',number =10,globals=globals()) 
12.078817503992468 

>>> def test_fp(): 
...  fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen) 
...  df = fp_obj.to_pandas() 

>>> timeit.timeit('test_fp',number =10,globals=globals()) 
2.0100269466638565e-06 

參考:

相關問題