2017-05-05 108 views
3

當在包含numpy數組的dask.bag上執行foldby時,我從dask/numpy得到非常無意義的FutureWarning消息。dask bag foldby with numpy arrays

def binop(a, b): 
    print('binop') 
    return a + b[1] 

def combine(a, b): 
    print('combine') 
    return a + b[1] 

seq = ((np.random.randint(0, 5, size=1)[0], np.ones(5,)) for _ in range(50)) 
db.from_sequence(seq, partition_size=10)\ 
    .foldby(0, binop=binop, initial=np.zeros(5,), combine=combine)\ 
    .compute() 

目標是加起來一堆NumPy數組。這會產生正確的結果,但也產生FutureWarning消息(看起來像每個分區一個)NumPy雖然它看起來好像他們來自dask

dask/async.py:247:FutureWarning:elementwise comparison failed;返回標代替,但在未來將執行的elementwise比較 回報FUNC(* args2)

只是增加了兩個numpy陣列,而不dask不會產生這樣有明確的一些參與與並行.foldby這裏。看起來在任何計算完成之前都會生成警告。

  • 如何確定警告是否應該關注?
  • 如果我應該關注它,我該如何讓警告消失?

我使用python 3.6 DASK 0.14.1和numpy的1.12.1

dask.bag.foldby


UPDATE

感謝@ MRocklin的答案,我開始尋找到這個多一點。因此,在dask.async.py有問題的代碼是this

def _execute_task(arg, cache, dsk=None): 
.... 
    if isinstance(arg, list): 
     return [_execute_task(a, cache) for a in arg] 
    elif istask(arg): 
     func, args = arg[0], arg[1:] 
     args2 = [_execute_task(a, cache) for a in args] 
     return func(*args2) 

是有可能,dask實際上是在試圖遍歷numpy數組中args2 = [_execute_task(a, cache) for a in args],我不知道內部不夠好(在所有的時候)的判斷這些變量包含的內容。

回答

2

這有事情做與cytoolz.itetoolz.reducebyinit值。將init從init=np.zeros((5,))更改爲init=lambda: np.zeros((5,))至少可以擺脫警告消息。

該警告是通過this line

cpdef dict reduceby(object key, object binop, object seq, object init='__no__default__'): 
... 
    cdef bint skip_init = init == no_default 

,其比較在初始化值(np.zeros((5,)))傳遞給使numpy失敗的carraystr元件兩比較字符串"__no__default__"生產。

所以回答我的問題:

  • 沒有,你不需要擔心這個警告,但它可能會在計劃在未來
  • 乾脆用放慢避免該警告可調用的init
  • 它似乎並沒有這個會不會有什麼重大的負面影響,但要記住的是,init調用將每一次執行過程被稱爲
2

這個警告確實來自numpy。通過代碼基地產量these lines快速搜索:

if (!strcmp(ufunc_name, "equal") || 
      !strcmp(ufunc_name, "not_equal")) { 
     /* Warn on non-scalar, return NotImplemented regardless */ 
     assert(nin == 2); 
     if (PyArray_NDIM(out_op[0]) != 0 || 
       PyArray_NDIM(out_op[1]) != 0) { 
      if (DEPRECATE_FUTUREWARNING(
        "elementwise comparison failed; returning scalar " 
        "instead, but in the future will perform elementwise " 
        "comparison") < 0) { 
       return -1; 
      } 
     } 

DASK可能使這一點變得更糟,因爲你會發現在每一個過程得到警告一次(dask.bag使用進程池默認情況下)。

另外,如果你的計算是通過numpy的束縛,那麼你可以考慮切換到線程調度,而不是多重調度

mybag.compute(get=dask.threaded.get) 

http://dask.pydata.org/en/latest/scheduler-choice.html

+0

我仍然不明白,雖然'dask'正在做的陣列產生的警告。該案例的評論爲:「這個條件基本上意味着」我們註定要失敗「,b/c」靈活的「dtypes - 字符串和無效 - 不能有自己註冊的ufunc循環...。這是沒有意義的,因爲數組的'dtype'是'float64',而不是'string'或'void'。請參閱上面修改的問題 –

+0

Dask.bag只是調用您提供的功能。它沒有引入任何特殊的邏輯。 – MRocklin

+0

我正確地認爲,如果'init'是一個可調用的函數,它會爲每個執行器調用一次嗎?這就是'cytoolz'代碼的樣子 –