2017-06-12 436 views
1

我試圖在Rx-cpp中實現observer/observable模式。這些是Rx.Net中的一個非常有趣的tutorial,有人可以這樣做。使用RxCpp構造觀察者/可觀察模式

在這個C#例子,有具體的interfaces,我們必須覆蓋:

public interface IObserver<in T> 
{ 
    void OnCompleted(); 
    void OnError(Exception error); 
    void OnNext(T value); 
} 


public interface IObservable<out T> 
{ 
    IDisposable Subscribe(IObserver<T> observer); 
} 

據我瞭解,在Rx-cpp沒有這樣的便利。那麼,是否有可能向我提供一些頭部示例(myObservable.h/myObserver.h),類似於上面的interfaces,我可以用它來指導定義相同的通信模式?

任何幫助,高度讚賞, 謝謝!

編輯1: 感謝@zentrunix,我試圖做一個面向類的通信。到目前爲止,我已經有了可觀察模式的代碼。我想要的是定義一個觀察者列表,這些觀察者列表將被添加到可觀察值中,當調用這些觀察者時,應該通知這些觀察者。但是,缺少一部分。

  1. 我怎樣才能subscribe()這些觀察員(Rx::subscribers<int>)當myObservable::Subscribe()函數被調用。
  2. 另外我怎樣才能unsubscribe()
  3. 最後,如何在多個onNext觀察員中看到相應的o.subscribe(onNext, onEnd);?是否有可能構建一個相應的類? (再次受到here的啓發)
  4. 對不起,但這是一個有意義的組織嗎?到目前爲止,我正在與tutorial中提供的體系結構一起工作,這就是我沉迷於此任務的原因。我發現它是一種參與RxCpp的方式。任何意見非常感謝。 (同樣對不起我的無知。)

    class myObservable { 
    
    private: 
    
    std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers; 
    
    public: 
    
    myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); }; 
    
    Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) { 
    
        return Rx::observable<>::create<int>([&, out]() { 
         auto it = observers->insert(observers->end(), *out); 
         it->add([=]() { 
          observers->erase(it); 
         }); 
        }); 
    
    }; 
    
    void OnNext(int sendItem) { 
    
        for (Rx::subscriber<int> observer : *observers) { 
         (observer).on_next(sendItem); 
        } 
    } 
    
    void Disposer(Rx::subscriber<int> out) { 
    
        observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end()); 
    }; 
    }; 
    
+0

我想要做的是構造兩個繼承'RxCpp''observer'和'observable'函數的類。在'Rx.Net'示例中,這由'class myIObserver:IObserver '完成。這不是如何將類轉換爲「虛擬」,而是如何構造相應地執行與「SubjectObserver:IObserver 」和「SubjectObservable:IObservable 」相同功能的「myObserver.h」和「myObservable.h」 。感謝他的興趣。 – Thoth

+0

我附加'c#'''interfaces'的原因是因爲我應該實現相同的函數('OnComplete()','OnNext(T value)'等),其'虛擬'我沒有找到。 – Thoth

+0

我認爲你的問題需要重新表述。另外,我剛剛查看了一些RxCpp源代碼(例如https://github.com/Reactive-Extensions/RxCpp/blob/master/Rx/v2/src/rxcpp/rx-observer.hpp),而不是看到任何看起來像界面的東西。我想你可能會以完全錯誤的方式來解決這個問題。 – Rook

回答

2

在下面RxCpp一個很簡單的例子。 儘管有(至少)一個警告:典型的RxCpp代碼大量使用lambda,我不太喜歡它。

我也試圖找到在互聯網上的文檔和教程,但找不到任何。我對關於線程模型的解釋特別感興趣。

如果您願意通過代碼和Doxygen文檔進行溝通,RxCpp GitHub站點中有很多示例。

#include <iostream> 
#include <exception> 

#include "rxcpp/rx.hpp" 
namespace rx = rxcpp; 

static void onNext(int n) { std::cout << "* " << n << "\n"; } 
static void onEnd() { std::cout << "* end\n"; } 

static void onError(std::exception_ptr ep) 
{ 
    try { std::rethrow_exception(ep); } 
    catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; } 
} 

static void observableImpl(rx::subscriber<int> s) 
{ 
    s.on_next(1); 
    s.on_next(2); 
    s.on_completed(); 
} 

int main() 
{ 
    auto o = rxcpp::observable<>::create<int>(observableImpl); 
    std::cout << "*\n"; 
    o.subscribe(onNext, onEnd); 
}