2016-07-24 63 views
0

我正在編寫Akka.Persistence for Service Fabric的實現,並且我似乎無法獲取快照的工作方式。當它嘗試恢復狀態時,它會獲取最新的快照,但它不會重播自最新快照以來的事件。它對我不清楚如果我沒有正確連接組件,或者如果我的持久性庫的實現不正確。 我的演員是一個簡單的計數器,我的狀態只是當前的計數。 我希望恢復應該先被調用,然後在最後一個快照和最高序列號之間爲每個日記條目調用Recover。日記中有一個函數ReplayMessagesAsync(...),它看起來應該這樣做,但它不會被調用。 我的計數器代碼如下,我的代碼的其餘部分是:Code爲什麼Akka.Persistance不會重播我的日誌條目

using Akka.Actor; 
using Akka.Persistence; 
using Akka.Persistence.ServiceFabric.Journal; 
using Akka.Persistence.ServiceFabric.Snapshot; 
using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading.Tasks; 

namespace AkkaPersistence.Actors 
{ 
    public class Counter : ReceivePersistentActor 
    { 
     public class GetCount { } 

     private int counter; 

     private CounterState State = new CounterState(); 

     private int _msgsSinceLastSnapshot = 0; 

     public Counter() 
     { 
      Recover<Evt>(evt => 
      { 
       State.Update(evt); 
      }); 

      Recover<SnapshotOffer>(offer => { 
       var snapshotEntry = offer.Snapshot as SnapshotEntry; 
       if (snapshotEntry != null) 
       { 
        State = (CounterState)snapshotEntry.Snapshot; 
       } 
      }); 

      Command<string>(str => Persist(str, s => 
      { 
       ++counter; 
       var evt = new Evt(s); 
       State.Update(evt); 

       if (++_msgsSinceLastSnapshot % 10 == 0) 
       { 
        //time to save a snapshot 
        SaveSnapshot(State.Copy()); 
       } 
      })); 

      Command<GetCount>(get => Sender.Tell(State.Count)); 

      Command<SaveSnapshotSuccess>(success => 
      { 
       ServiceEventSource.Current.Message($"Saved snapshot"); 
       DeleteMessages(success.Metadata.SequenceNr); 
      }); 

      Command<SaveSnapshotFailure>(failure => { 
       // handle snapshot save failure... 
       ServiceEventSource.Current.Message($"Snapshot failure"); 
      }); 

     } 

     public override string PersistenceId 
     { 
      get 
      { 
       return "counter"; 
      } 
     } 
    } 

    internal class CounterState 
    { 
     private long count = 0L; 

     public long Count 
     { 
      get { return count; } 
      set { count = value; } 
     } 

     public CounterState(long count) 
     { 
      this.Count = count; 
     } 

     public CounterState() : this(0) 
     { 
     } 

     public CounterState Copy() 
     { 
      return new CounterState(count); 
     } 

     public void Update(Evt evt) 
     { 
      ++Count; 
     } 
    } 

    public class Evt 
    { 
     public Evt(string data) 
     { 
      Data = data; 
     } 

     public string Data { get; } 

    } 

    public class Cmd 
    { 
     public Cmd(string data) 
     { 
      Data = data; 
     } 

     public string Data { get; } 
    } 

} 

回答

0

有一對夫婦的事情,我有錯: 1)我需要回到什麼傳入的,不是我的SnapshotEntry這是我的持久性機制的實現細節。 2)我從保存字符串轉換爲嘗試將對象保存爲日記的一部分時出現了一個簡單的錯誤。 3)最後還有一個問題,那就是潛在的問題,那就是序列化與子對象一起失敗。在這段代碼中,我不希望包含子對象的類型,因此我爲日誌以及現有的SnapshotSerializer添加了一個自定義序列化程序(the Wire serializer),它現在正在工作。

相關問題