2
我們可以看到StateStoreRestoreExec如下。什麼是「row +:savedState.toSeq」在StateStoreRestoreExec.doExecute中做什麼?
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
numOutputRows += 1
row +: savedState.toSeq
}
}
在這裏,我想知道的row +: savedState.toSeq
意義。我認爲row是UnsafeRow的一個實例,savedState.toSeq是Seq的一個實例。那麼我們如何使用+:
來操作它們。另一方面,我認爲savedState是UnsafeRow的一個實例,toSeq不是UnsaveRow的成員,那麼savedState.toSeq
如何工作?
謝謝你的回答。我現在很清楚。另外,關鍵是什麼意思?並且你說「映射到這些UnsafeRow的每一個上」,我想你的意思是「在這些UnsafeRow中的每一個上的flapmap」 – GOGO
@GOGO'StateStore'在鍵值對中保存內存中的狀態,以便在需要時可以快速查找key的值,就像'HashMap'的工作方式一樣(它實際上由內部的併發哈希映射支持以存儲關鍵值對)。 –
謝謝你的回覆。我可以看到它實際上由一個併發哈希映射支持,但我不知道關鍵是什麼。例如,我可以使用一個以Int類型爲ID的ID,並使用一個名爲String類型的NAME作爲值來記錄映射ID-> NAME。但我對'StateStore'如何組織數據感到困惑。 – GOGO