我有一個簡單的工作,觸發= 15秒,Source = kafka和Sink = S3。是否可以找到從卡夫卡下載郵件花了多少時間?或者說,如果我有Sink = Console,它會將驅動程序的數據帶回,是否有可能找到從卡夫卡下載數據的時間以及將驅動程序帶回驅動器的時間?如何計算從Kafka獲取記錄的時間?
從驅動程序我寫這些查詢到S3。是否有可能瞭解在從Kafka下載觸發執行時間= 44秒時花費了多少時間?
Streaming query made progress: {
id : 1383g52b-8de4-4e95-a3s9-aea73qe3ea56,
runId : 1206f5tc-t503-44r0-bc0c-26ce404w6724,
name : null,
timestamp : 2017-08-25T01:42:10.000Z,
numInputRows : 99998,
inputRowsPerSecond : 1666.6333333333334,
processedRowsPerSecond : 2263.9860535669814,
durationMs : {
addBatch : 42845,
getBatch : 3,
getOffset : 68,
queryPlanning : 6,
triggerExecution : 44169,
walCommit : 1245
},
stateOperators : [ ],
sources : [ {
description : KafkaSource[Subscribe[kafka_topic]],
startOffset : {
kafka_topic : {
2 : 20119244,
4 : 2,
1 : 20124601,
3 : 20113622,
0 : 20114208
}
},
endOffset : {
kafka_topic : {
2 : 20139245,
4 : 20143531,
1 : 20144592,
3 : 20133663,
0 : 20134192
}
},
numInputRows : 99998,
inputRowsPerSecond : 1666.6333333333334,
processedRowsPerSecond : 2263.9860535669814
} ],
sink : {
description : FileSink[s3://s3bucket]
}
}
謝謝!
我通過實施一個不做任何實際處理的ForEachSink解決了這個問題,這給了我純粹的下載kafka事件的時間。 ForeachSink:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala –