2017-08-22 45 views
2

我們可以看到StateStoreRestoreExec如下。什麼是「row +:savedState.toSeq」在StateStoreRestoreExec.doExecute中做什麼?

case class StateStoreRestoreExec(
    keyExpressions: Seq[Attribute], 
    stateId: Option[OperatorStateId], 
    child: SparkPlan) 
    extends UnaryExecNode with StateStoreReader { 

    override protected def doExecute(): RDD[InternalRow] = { 
    val numOutputRows = longMetric("numOutputRows") 

    child.execute().mapPartitionsWithStateStore(
    getStateId.checkpointLocation, 
    operatorId = getStateId.operatorId, 
    storeVersion = getStateId.batchId, 
    keyExpressions.toStructType, 
    child.output.toStructType, 
    sqlContext.sessionState, 
    Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => 
    val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) 
    iter.flatMap { row => 
     val key = getKey(row) 
     val savedState = store.get(key) 
     numOutputRows += 1 
     row +: savedState.toSeq 
    } 
} 

在這裏,我想知道的row +: savedState.toSeq意義。我認爲row是UnsafeRow的一個實例,savedState.toSeq是Seq的一個實例。那麼我們如何使用+:來操作它們。另一方面,我認爲savedState是UnsafeRow的一個實例,toSeq不是UnsaveRow的成員,那麼savedState.toSeq如何工作?

回答

2

rowInternalRow的實例,並且savedStateOption[UnsafeRow],其延伸InternalRow。這裏發生的情況是保存的狀態從Option[UnsafeRow]轉換爲Seq[UnsafeRow],然後row實例被預置爲該序列。

當您通過這些UnsafeRow對象返回Iterator[UnsafeRow]

+0

謝謝你的回答。我現在很清楚。另外,關鍵是什麼意思?並且你說「映射到這些UnsafeRow的每一個上」,我想你的意思是「在這些UnsafeRow中的每一個上的flapmap」 – GOGO

+0

@GOGO'StateStore'在鍵值對中保存內存中的狀態,以便在需要時可以快速查找key的值,就像'HashMap'的工作方式一樣(它實際上由內部的併發哈希映射支持以存儲關鍵值對)。 –

+0

謝謝你的回覆。我可以看到它實際上由一個併發哈希映射支持,但我不知道關鍵是什麼。例如,我可以使用一個以Int類型爲ID的ID,並使用一個名爲String類型的NAME作爲值來記錄映射ID-> NAME。但我對'StateStore'如何組織數據感到困惑。 – GOGO