2016-11-16 117 views
0

我是Spark新手。我試圖找出Spark的驅逐策略,有些人說它是LRU,例如,this articlethis oneSpark的當前驅逐策略是什麼? FIFO還是LRU?

然而,當我看着的MemoryStoreBlockManager的源代碼,我找不到LRU的邏輯:

  1. 有LinkedHashMap中記錄的所有塊在MemoryStore的

    // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and 
    // acquiring or releasing unroll memory, must be synchronized on `memoryManager`! 
    private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true) 
    
  2. 被訪問塊時,它不會被移動到的LinkedHashMap的頭

    def getValues(blockId: BlockId): Option[Iterator[_]] = { 
        val entry = entries.synchronized { entries.get(blockId) } 
        entry match { 
         case null => None 
         case e: SerializedMemoryEntry[_] => 
         throw new IllegalArgumentException("should only call getValues on deserialized blocks") 
         case DeserializedMemoryEntry(values, _, _) => 
         val x = Some(values) 
         x.map(_.iterator) 
        } 
    } 
    
  3. 在驅逐塊的邏輯
  4. ,所選塊在LinkedHashMap中的的entrySet, 我認爲的順序是先入和後入先出

    private[spark] def evictBlocksToFreeSpace(
        blockId: Option[BlockId], 
        space: Long, 
        memoryMode: MemoryMode): Long = { 
        assert(space > 0) 
        memoryManager.synchronized { 
        var freedMemory = 0L 
        val rddToAdd = blockId.flatMap(getRddId) 
        val selectedBlocks = new ArrayBuffer[BlockId] 
        def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { 
         entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) 
        } 
        // This is synchronized to ensure that the set of entries is not changed 
        // (because of getValue or getBytes) while traversing the iterator, as that 
        // can lead to exceptions. 
        entries.synchronized { 
         val iterator = entries.entrySet().iterator() 
         while (freedMemory < space && iterator.hasNext) { 
         val pair = iterator.next() 
         val blockId = pair.getKey 
         val entry = pair.getValue 
         if (blockIsEvictable(blockId, entry)) { 
          // We don't want to evict blocks which are currently being read, so we need to obtain 
          // an exclusive write lock on blocks which are candidates for eviction. We perform a 
          // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: 
          if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { 
          selectedBlocks += blockId 
          freedMemory += pair.getValue.size 
          } 
         } 
         } 
        } 
        ... 
        if (freedMemory >= space) { 
         logInfo(s"${selectedBlocks.size} blocks selected for dropping " + 
         s"(${Utils.bytesToString(freedMemory)} bytes)") 
         for (blockId <- selectedBlocks) { 
         val entry = entries.synchronized { entries.get(blockId) } 
         // This should never be null as only one task should be dropping 
         // blocks and removing entries. However the check is still here for 
         // future safety. 
         if (entry != null) { 
          dropBlock(blockId, entry) 
         } 
         } 
         ... 
        } 
        } 
    } 
    

因此,驅逐Spark的策略是FIFO還是LRU?

回答

-1

我以前有同樣的問題,但答案相當棘手: 從您粘貼在這裏的代碼,沒有明確的「促進」操作。 但實際上,「LinkedHashMap」是一種確保LRU順序的特殊數據結構。

0

你在這行有LinkedHashMap中的構造器: 私人VAL項=新的LinkedHashMap [塊標識,MemoryEntry [_](32,0.75f,真) 是一個構造在LinkedHashMap的創建訪問順序: LinkedHashMap(int initialCapacity,float loadFactor,boolean accessOrder) 布爾值設置爲true,這表示按照最近訪問次數最少的訪問順序對鍵進行排序。

+0

這樣回答問題的方式是什麼? – luk2302

+0

再次閱讀我的答案!我說:「布爾值被設置爲true,這意味着鍵是根據最近訪問過的最近訪問的訪問順序排序的。」所以驅逐策略是LRU。這些塊根據其在條目linkedHashMap中的訪問順序進行排序。所選擇的驅逐塊按照LinkedHashMap的entrySet的順序,這意味着要被驅逐的第一個塊是最近最少使用的塊 –

+0

這個答案是有幫助和正確的,謝謝niko – leon