使用聚合框架可以計算一階導數嗎?使用MongoDB彙總框架計算一階導數
例如,我有數據:
{time_series : [10,20,40,70,110]}
我試圖得到類似的輸出:
{derivative : [10,20,30,40]}
使用聚合框架可以計算一階導數嗎?使用MongoDB彙總框架計算一階導數
例如,我有數據:
{time_series : [10,20,40,70,110]}
我試圖得到類似的輸出:
{derivative : [10,20,30,40]}
db.collection.aggregate(
[
{
"$addFields": {
"indexes": {
"$range": [
0,
{
"$size": "$time_series"
}
]
},
"reversedSeries": {
"$reverseArray": "$time_series"
}
}
},
{
"$project": {
"derivatives": {
"$reverseArray": {
"$slice": [
{
"$map": {
"input": {
"$zip": {
"inputs": [
"$reversedSeries",
"$indexes"
]
}
},
"in": {
"$subtract": [
{
"$arrayElemAt": [
"$$this",
0
]
},
{
"$arrayElemAt": [
"$reversedSeries",
{
"$add": [
{
"$arrayElemAt": [
"$$this",
1
]
},
1
]
}
]
}
]
}
}
},
{
"$subtract": [
{
"$size": "$time_series"
},
1
]
}
]
}
},
"time_series": 1
}
}
]
)
我們可以使用3.4+以上版本的流水線來做到這一點。 在流水線中,我們使用$addFields
流水線階段。運營商的陣列添加「time_series」的元素的索引做文件,我們也扭轉了時間序列陣列,通過分別$range
和$reverseArray
運營商
其添加到文檔中,我們在這裏扭轉了數組,因爲該元素在數組中的位置p
總是比p+1
位置,這意味着[p] - [p+1] < 0
,我們不希望在這裏使用$multiply
元素更大。(見3.2版本管道)
接下來我們用$zipped
的時間序列數據索引數組並使用將substract
表達式應用於結果數組10個操作員。
然後我們$slice
結果從數組中丟棄null/None
值並重新調換結果。
在3.2我們可以使用$unwind
操作者退繞我們的陣列,並通過指定一個文檔作爲操作數,而不是通過$前綴傳統的「路徑」包括陣列中的每個元素的索引。
下的管道,我們需要$group
我們的文件,並使用$push
累加器操作符返回的是像這樣子文檔的數組:
{
"_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
"time_series" : [
{ "value" : 10, "index" : NumberLong(0) },
{ "value" : 20, "index" : NumberLong(1) },
{ "value" : 40, "index" : NumberLong(2) },
{ "value" : 70, "index" : NumberLong(3) },
{ "value" : 110, "index" : NumberLong(4) }
]
}
最後是$project
階段。在這個階段,我們需要使用$map
運算符對$group
階段中新計算的數組中的每個元素應用一系列表達式。
這裏是在$map
內部發生的(見$map
作爲for循環)在表達:
對於每個子文檔中,我們將值字段分配給使用$let
變量操作符的變量。然後我們從數組中下一個元素的「value」字段的值中減去它的值。
由於數組中的下一個元素是當前索引加一處的元素,所有我們需要的是$arrayElemAt
運營商的幫助和簡單$add
銀行足球比賽當前元素的索引和1
的。
$subtract
表達式返回負值,所以我們需要使用$multiply
運算符將值乘以-1
。
我們還需要$filter
結果數組,因爲它的最後一個元素是None
或null
。原因是噹噹前元素是最後一個元素時,$subtract
返回None
,因爲下一個元素的索引等於數組的大小。
db.collection.aggregate([
{
"$unwind": {
"path": "$time_series",
"includeArrayIndex": "index"
}
},
{
"$group": {
"_id": "$_id",
"time_series": {
"$push": {
"value": "$time_series",
"index": "$index"
}
}
}
},
{
"$project": {
"time_series": {
"$filter": {
"input": {
"$map": {
"input": "$time_series",
"as": "el",
"in": {
"$multiply": [
{
"$subtract": [
"$$el.value",
{
"$let": {
"vars": {
"nextElement": {
"$arrayElemAt": [
"$time_series",
{
"$add": [
"$$el.index",
1
]
}
]
}
},
"in": "$$nextElement.value"
}
}
]
},
-1
]
}
}
},
"as": "item",
"cond": {
"$gte": [
"$$item",
0
]
}
}
}
}
}
])
我認爲這是低效率的另一種選擇是進行地圖/使用map_reduce
方法減少對我們的回收操作。
>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
... function() {
... var derivatives = [];
... for (var index=1; index<this.time_series.length; index++) {
... derivatives.push(this.time_series[index] - this.time_series[index-1]);
... }
... emit(this._id, derivatives);
... }
... """)
>>> reducer = Code("""
... function(key, value) {}
... """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
... print(res) # or do something with the document.
...
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}
您也可以檢索所有文檔,並使用numpy.diff
返回這樣的衍生物:
import numpy as np
for document in collection.find({}, {'time_series': 1}):
result = np.diff(document['time_series'])
這是一個有點髒,但也許這樣的事情?
use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})
var mapF = function() {
emit(this.id, this.time_series);
emit(this.id, this.time_series);
};
var reduceF = function(key, values){
var n = values[0].length;
var ret = [];
for(var i = 0; i < n-1; i++){
ret.push(values[0][i+1] - values[0][i]);
}
return {'gradient': ret};
};
var finalizeF = function(key, val){
return val.gradient;
}
db['data'].mapReduce(
mapF,
reduceF,
{ out: 'data_d1', finalize: finalizeF }
)
db['data_d1'].find({})
的「策略」在這裏是爲發射要被操作的數據的兩倍,以便它是可訪問的在降低階段,返回一個目的是避免消息「減少 - >多個尚不支持」然後在終結器中過濾數組。
這個腳本然後產生:
MongoDB shell version: 3.2.9
connecting to: test
switched to db test_db
WriteResult({ "nRemoved" : 1 })
WriteResult({ "nInserted" : 1 })
{
"result" : "data_d1",
"timeMillis" : 13,
"counts" : {
"input" : 1,
"emit" : 2,
"reduce" : 1,
"output" : 1
},
"ok" : 1
}
{ "_id" : 1, "value" : [ 10, 20, 30, 40 ] }
bye
或者,可以將所有加工成終結(reduceF
這裏沒有所謂,因爲mapF
假定發出獨特的鍵):
use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})
var mapF = function() {
emit(this.id, this.time_series);
};
var reduceF = function(key, values){
};
var finalizeF = function(key, val){
var x = val;
var n = x.length;
var ret = [];
for(var i = 0; i < n-1; i++){
ret.push(x[i+1] - x[i]);
}
return ret;
}
db['data'].mapReduce(
mapF,
reduceF,
{ out: 'data_d1', finalize: finalizeF }
)
db['data_d1'].find({})
是有原因你想在聚合框架內VS做到這一點。使用強大的python庫實現? – JohnnyHK
@JohnnyHK - 你可以給我一個python庫實現的例子嗎? 我目前的解決方法是使用pymongo獲取所有的字段,並在Python中進行派生。事實證明,它很慢(受網絡帶寬限制?),這讓我四處尋找替代品。 – user666
@JohnnyHK我認爲聚合框架是最好的選擇。甚至比['numpy.diff']快(https://docs.scipy.org/doc/numpy/reference/generated/numpy.diff.html)。我將基準測試結果添加到了我的回答中 – styvane