1
我創建了一條河,每小時運行一條河從DB中獲取數據(使用jdbc河插件)。如何從彈性搜索中獲取max _id
select * from orders
而是選擇我要選擇是基於初級key.Query附加將數據全部記錄:
select * from orders where deviceid > '(Max Id in Elastic search)'
?我怎樣才能從彈性最大搜索_id?
我創建了一條河,每小時運行一條河從DB中獲取數據(使用jdbc河插件)。如何從彈性搜索中獲取max _id
select * from orders
而是選擇我要選擇是基於初級key.Query附加將數據全部記錄:
select * from orders where deviceid > '(Max Id in Elastic search)'
?我怎樣才能從彈性最大搜索_id?
由於ES堅持將"_id"
的值轉換爲字符串,似乎沒有辦法直接使用"_id"
字段。但是有一種解決方法。
首先,我建立了一個簡單的指標有幾個文檔,如下所示:
PUT /test_index
{
"settings": {
"number_of_shards": 1
}
}
POST /test_index/_bulk
{"index":{"_index":"test_index","_type":"doc","_id":1}}
{"title":"first doc"}
{"index":{"_index":"test_index","_type":"doc","_id":2}}
{"title":"second doc"}
{"index":{"_index":"test_index","_type":"doc","_id":3}}
{"title":"third doc"}
然後我試圖使用max aggregation,卻得到了一個錯誤,因爲"_id"
s爲字符串:
POST /test_index/_search?search_type=count
{
"aggs": {
"max_id": {
"max": {
"field": "_id"
}
}
}
}
...
{
"error": "SearchPhaseExecutionException[Failed to execute phase [query], all shards failed; shardFailures {[bQS7TqO9SfKSPQZYVXQBag][test_index][0]: ClassCastException[org.elasticsearch.index.fielddata.plain.PagedBytesIndexFieldData cannot be cast to org.elasticsearch.index.fielddata.IndexNumericFieldData]}]",
"status": 500
}
所以這是行不通的。但是,使用"_id"
field中的"path"
參數稍作修改。
所以我重新定義索引
DELETE /test_index
PUT /test_index
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"doc": {
"_id": {
"path": "doc_id"
}
}
}
}
,然後索引使用"doc_id"
路徑文檔:
POST /test_index/_bulk
{"index":{"_index":"test_index","_type":"doc"}}
{"title":"first doc","doc_id":1}
{"index":{"_index":"test_index","_type":"doc"}}
{"title":"second doc","doc_id":2}
{"index":{"_index":"test_index","_type":"doc"}}
{"title":"third doc","doc_id":3}
現在,如果我搜索,我可以看到"_id"
仍然是一個字符串,但"doc_id"
是一個整數:
POST /test_index/_search
...
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "test_index",
"_type": "doc",
"_id": "1",
"_score": 1,
"_source": {
"title": "first doc",
"doc_id": 1
}
},
{
"_index": "test_index",
"_type": "doc",
"_id": "2",
"_score": 1,
"_source": {
"title": "second doc",
"doc_id": 2
}
},
{
"_index": "test_index",
"_type": "doc",
"_id": "3",
"_score": 1,
"_source": {
"title": "third doc",
"doc_id": 3
}
}
]
}
}
所以現在我可以方便地使用最大聚集找到值的最大ID:
POST /test_index/_search?search_type=count
{
"aggs": {
"max_id": {
"max": {
"field": "doc_id"
}
}
}
}
...
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0,
"hits": []
},
"aggregations": {
"max_id": {
"value": 3
}
}
}
謝謝您answer.Now我能夠獲得最大的value.How,我應該把它用在查詢例如: - SELECT * FROM訂單,其中設備ID >'(彈性搜索中的最大ID)'。如何替換「彈性搜索中的最大ID」中的值。注意:河流計劃每小時運行一次,因此非常時間河流/查詢運行它應該能夠從彈性搜索中獲得最大值 – ABC 2015-03-03 09:37:16
您可能必須編寫某種腳本來處理該部分。也許是由cron作業或其他東西運行的python腳本。 – 2015-03-03 15:40:56
如何將腳本作爲jdbc river插件中參數選項卡的輸入? – ABC 2015-03-09 11:47:09