2016-08-15 47 views
13

使用聚合框架可以計算一階導數嗎?使用MongoDB彙總框架計算一階導數

例如,我有數據:

{time_series : [10,20,40,70,110]} 

我試圖得到類似的輸出:

{derivative : [10,20,30,40]} 
+0

是有原因你想在聚合框架內VS做到這一點。使用強大的python庫實現? – JohnnyHK

+0

@JohnnyHK - 你可以給我一個python庫實現的例子嗎? 我目前的解決方法是使用pymongo獲取所有的字段,並在Python中進行派生。事實證明,它很慢(受網絡帶寬限制?),這讓我四處尋找替代品。 – user666

+0

@JohnnyHK我認爲聚合框架是最好的選擇。甚至比['numpy.diff']快(https://docs.scipy.org/doc/numpy/reference/generated/numpy.diff.html)。我將基準測試結果添加到了我的回答中 – styvane

回答

6
db.collection.aggregate(
    [ 
     { 
     "$addFields": { 
      "indexes": { 
      "$range": [ 
       0, 
       { 
       "$size": "$time_series" 
       } 
      ] 
      }, 
      "reversedSeries": { 
      "$reverseArray": "$time_series" 
      } 
     } 
     }, 
     { 
     "$project": { 
      "derivatives": { 
      "$reverseArray": { 
       "$slice": [ 
       { 
        "$map": { 
        "input": { 
         "$zip": { 
         "inputs": [ 
          "$reversedSeries", 
          "$indexes" 
         ] 
         } 
        }, 
        "in": { 
         "$subtract": [ 
         { 
          "$arrayElemAt": [ 
          "$$this", 
          0 
          ] 
         }, 
         { 
          "$arrayElemAt": [ 
          "$reversedSeries", 
          { 
           "$add": [ 
           { 
            "$arrayElemAt": [ 
            "$$this", 
            1 
            ] 
           }, 
           1 
           ] 
          } 
          ] 
         } 
         ] 
        } 
        } 
       }, 
       { 
        "$subtract": [ 
        { 
         "$size": "$time_series" 
        }, 
        1 
        ] 
       } 
       ] 
      } 
      }, 
      "time_series": 1 
     } 
     } 
    ] 
) 

我們可以使用3.4+以上版本的流水線來做到這一點。 在流水線中,我們使用$addFields流水線階段。運營商的陣列添加「time_series」的元素的索引做文件,我們也扭轉了時間序列陣列,通過分別$range$reverseArray運營商

其添加到文檔中,我們在這裏扭轉了數組,因爲該元素在數組中的位置p總是比p+1位置,這意味着[p] - [p+1] < 0,我們不希望在這裏使用$multiply元素更大。(見3.2版本管道)

接下來我們用$zipped的時間序列數據索引數組並使用將substract表達式應用於結果數組10個操作員。

然後我們$slice結果從數組中丟棄null/None值並重新調換結果。


在3.2我們可以使用$unwind操作者退繞我們的陣列,並通過指定一個文檔作爲操作數,而不是通過$前綴傳統的「路徑」包括陣列中的每個元素的索引。

下的管道,我們需要$group我們的文件,並使用$push累加器操作符返回的是像這樣子文檔的數組:

{ 
    "_id" : ObjectId("57c11ddbe860bd0b5df6bc64"), 
    "time_series" : [ 
     { "value" : 10, "index" : NumberLong(0) }, 
     { "value" : 20, "index" : NumberLong(1) }, 
     { "value" : 40, "index" : NumberLong(2) }, 
     { "value" : 70, "index" : NumberLong(3) }, 
     { "value" : 110, "index" : NumberLong(4) } 
    ] 
} 

最後是$project階段。在這個階段,我們需要使用$map運算符對$group階段中新計算的數組中的每個元素應用一系列表達式。

這裏是在$map內部發生的(見$map作爲for循環)表達:

對於每個子文檔中,我們將字段分配給使用$let變量操作符的變量。然後我們從數組中下一個元素的「value」字段的值中減去它的值。

由於數組中的下一個元素是當前索引加一處的元素,所有我們需要的是$arrayElemAt運營商的幫助和簡單$add銀行足球比賽當前元素的索引和1的。

$subtract表達式返回負值,所以我們需要使用$multiply運算符將值乘以-1

我們還需要$filter結果數組,因爲它的最後一個元素是Nonenull。原因是噹噹前元素是最後一個元素時,$subtract返回None,因爲下一個元素的索引等於數組的大小。

db.collection.aggregate([ 
    { 
    "$unwind": { 
     "path": "$time_series", 
     "includeArrayIndex": "index" 
    } 
    }, 
    { 
    "$group": { 
     "_id": "$_id", 
     "time_series": { 
     "$push": { 
      "value": "$time_series", 
      "index": "$index" 
     } 
     } 
    } 
    }, 
    { 
    "$project": { 
     "time_series": { 
     "$filter": { 
      "input": { 
      "$map": { 
       "input": "$time_series", 
       "as": "el", 
       "in": { 
       "$multiply": [ 
        { 
        "$subtract": [ 
         "$$el.value", 
         { 
         "$let": { 
          "vars": { 
          "nextElement": { 
           "$arrayElemAt": [ 
           "$time_series", 
           { 
            "$add": [ 
            "$$el.index", 
            1 
            ] 
           } 
           ] 
          } 
          }, 
          "in": "$$nextElement.value" 
         } 
         } 
        ] 
        }, 
        -1 
       ] 
       } 
      } 
      }, 
      "as": "item", 
      "cond": { 
      "$gte": [ 
       "$$item", 
       0 
      ] 
      } 
     } 
     } 
    } 
    } 
]) 

我認爲這是低效率的另一種選擇是進行地圖/使用map_reduce方法減少對我們的回收操作。

>>> import pymongo 
>>> from bson.code import Code 
>>> client = pymongo.MongoClient() 
>>> db = client.test 
>>> collection = db.collection 
>>> mapper = Code(""" 
...    function() { 
...     var derivatives = []; 
...     for (var index=1; index<this.time_series.length; index++) { 
...     derivatives.push(this.time_series[index] - this.time_series[index-1]); 
...     } 
...     emit(this._id, derivatives); 
...    } 
...    """) 
>>> reducer = Code(""" 
...    function(key, value) {} 
...    """) 
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']: 
...  print(res) # or do something with the document. 
... 
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')} 

您也可以檢索所有文檔,並使用numpy.diff返回這樣的衍生物:

import numpy as np 


for document in collection.find({}, {'time_series': 1}): 
    result = np.diff(document['time_series']) 
4

這是一個有點髒,但也許這樣的事情?

use test_db 
db['data'].remove({}) 
db['data'].insert({id: 1, time_series: [10,20,40,70,110]}) 

var mapF = function() { 
    emit(this.id, this.time_series); 
    emit(this.id, this.time_series); 
}; 

var reduceF = function(key, values){ 
    var n = values[0].length; 
    var ret = []; 
    for(var i = 0; i < n-1; i++){ 
     ret.push(values[0][i+1] - values[0][i]); 
    } 
    return {'gradient': ret}; 
}; 

var finalizeF = function(key, val){ 
    return val.gradient; 
} 

db['data'].mapReduce(
    mapF, 
    reduceF, 
    { out: 'data_d1', finalize: finalizeF } 
) 

db['data_d1'].find({}) 

的「策略」在這裏是爲發射要被操作的數據的兩倍,以便它是可訪問的在降低階段,返回一個目的是避免消息「減少 - >多個尚不支持」然後在終結器中過濾數組。

這個腳本然後產生:

MongoDB shell version: 3.2.9 
connecting to: test 
switched to db test_db 
WriteResult({ "nRemoved" : 1 }) 
WriteResult({ "nInserted" : 1 }) 
{ 
    "result" : "data_d1", 
     "timeMillis" : 13, 
     "counts" : { 
      "input" : 1, 
      "emit" : 2,  
      "reduce" : 1,   
      "output" : 1      
     },          
     "ok" : 1          
}             
{ "_id" : 1, "value" : [ 10, 20, 30, 40 ] }   
bye 

或者,可以將所有加工成終結(reduceF這裏沒有所謂,因爲mapF假定發出獨特的鍵):

use test_db 
db['data'].remove({}) 
db['data'].insert({id: 1, time_series: [10,20,40,70,110]}) 

var mapF = function() { 
    emit(this.id, this.time_series); 
}; 

var reduceF = function(key, values){ 
}; 

var finalizeF = function(key, val){ 
    var x = val; 
    var n = x.length; 

    var ret = []; 
    for(var i = 0; i < n-1; i++){ 
     ret.push(x[i+1] - x[i]); 
    } 
    return ret; 
} 

db['data'].mapReduce(
    mapF, 
    reduceF, 
    { out: 'data_d1', finalize: finalizeF } 
) 

db['data_d1'].find({})