2017-04-24 50 views
0

我試圖從read_parquet Concat的DaskDataFrame,然後應用查詢過濾器,然後品嚐它封頂最終數據幀大小小於或等於10000下面是僞代碼:DASK數據框中查詢然後抽樣誤差

import dask.dataframe as dd 

df = dd.concat([ dd.read_parquet(path, index='date').query("(col0 < 4) & (date < '20170201')") 
       for path in files ], 
       interleave_partitions=True) 
df = df.sample(float(10000)/max(10000, len(df))) 
df = df.compute() 

然而,它沒有用:

ValueError: a must be greater than 0 

Traceback 
--------- 
    File "/opt/anaconda2/lib/python2.7/site-packages/dask/async.py", line 266, in execute_task 
    result = _execute_task(task, data) 
    File "/opt/anaconda2/lib/python2.7/site-packages/dask/async.py", line 247, in _execute_task 
    return func(*args2) 
    File "/opt/anaconda2/lib/python2.7/site-packages/dask/dataframe/methods.py", line 143, in sample 
    return df.sample(random_state=rs, frac=frac, replace=replace) 
    File "/opt/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 2644, in sample 
    locs = rs.choice(axis_length, size=n, replace=replace, p=weights) 
    File "mtrand.pyx", line 1391, in mtrand.RandomState.choice (numpy/random/mtrand/mtrand.c:16430) 

如果我不這樣做的.query(...)一部分,那麼它工作正常。如果我在樣本後應用查詢,這也是可以的,但是我無法控制最終的DataFrame大小。我在這裏嘗試做什麼有什麼不妥?

我正在運行OS X 10.10.5,fastparquet 0.0.5,dask 0.14.1,python 2.7.12。

+0

貴'.query'返回一個空數據幀?你可以用'pd.DataFrame()。sample()'從pandas中得到一個類似的錯誤,所以我在查詢之後檢查'len(df)'。 – TomAugspurger

+0

數據正常,len(df)以百萬爲單位。 – user1527390

+0

TomAugspurger的評論是正確的,儘管長度檢查不應該在daskdataframe查詢之後,而應該在dask.dataframe.sample(...)函數內(請參見下面的答案),因爲更多的時候一些分區是空的,但整個dask.dataframe不是空的。 – user1527390

回答

0

「ValueError:a必須大於0」錯誤被拋出,因爲一些熊貓DataFrame是空的。這個ValueError拋出了pandas.DataFrame.sample方法。因爲我們在dask查詢之後進行了抽樣,並且並非查詢的所有子任務都會產生非空的pandas.DataFrame,所以這個ValueError幾乎可以保證發生。

正確的修復應該在dask.dataframe代碼:返回df本身,如果它是空的,否則叫df.sample

> /opt/anaconda2/lib/python2.7/site-packages/dask/dataframe/methods.py(166)sample() 
164 def sample(df, state, frac, replace): 
165  rs = np.random.RandomState(state) 
--> 166  return df.sample(random_state=rs, frac=frac, replace=replace) 

i.e. return df.sample(random_state=rs, frac=frac, replace=replace) \ 
      if len(df) > 0 else df