2017-02-22 98 views
0

如果此問題已經有答案,但是如果有答案,我無法在此網站上找到答案。在單個線程上處理來自多個線程的請求 - .NET Core

首先 - 這個問題是特定於.NET核心(V1.1.0在寫作時)

我有一個第三方組件,該組件將只處理向它提出的請求,如果他們在同一線程上發起。此程序集是RabbitMQ庫和問題的詳細信息,可能與can be found here相關或可能不相關。基本上 - 我有多個線程可能會調用這個程序集 - 但是如果請求來自不同的線程 - 則會引發異常。

所以 - 爲了解決這個問題,我試圖創建一個被阻塞的線程,因此不會過期 - 並且必須以某種方式在此線程上處理對此程序集的所有調用。

我的第一次嘗試是創建一個事件並訂閱被阻止的線程上的事件。那麼任何其他線程都會開始調用這個事件,我認爲可能會在正確的線程中找到這個事件,這樣我就可以實現我在單線程上處理第三方程序集請求的願望。

我現在(痛苦左右)瞭解到it is not possible in .NET core to begin-invoke an event in .Net core :(

例子演示了此問題:

public class Program 
{ 
    public static void Main(string[] args) 
    { 
     Program program = new Program(); 
     ManualResetEvent resetEvent = new ManualResetEvent(false); 
     program.StartNewThreadToBeCalled(resetEvent); 

     program.CallBobMultipleTimes(resetEvent); 

     Console.ReadLine(); 
    } 

    private void CallBobMultipleTimes(ManualResetEvent resetEvent) 
    { 
     resetEvent.WaitOne(); 
     for(int i=0 ; i<100 ; i++) 
      ThreadPool.QueueUserWorkItem(x=>CallBob(null, null)); //Can't BeginInvoke in .NET Core 
    } 

    private void StartNewThreadToBeCalled(ManualResetEvent resetEvent) 
    { 
     ThreadPool.QueueUserWorkItem(x=> 
     { 
      Bob bob = new Bob(); 

      CallBob += (obj, e)=> bob.InvokeMeOnOneThreadOnly(); 
      resetEvent.Set();     
      ManualResetEvent mre = new ManualResetEvent(false); 
      mre.WaitOne(); //Real implementation will block forever - this should be the only thread handles the processing of requests. 
     }); 
    } 

    public event EventHandler CallBob; 
} 

public class Bob 
{ 
    private List<int> ThreadIdentifiers = new List<int>(); 
    private static object SyncObject = new object(); 


    public void InvokeMeOnOneThreadOnly() 
    { 
     lock(SyncObject) 
     { 
      int currentThreadId = Thread.CurrentThread.ManagedThreadId; 
      if(ThreadIdentifiers.Any()) 
      { 
       if(!ThreadIdentifiers.Contains(currentThreadId)) 
        Console.WriteLine("Don't call me from multiple threads!"); 
      } 
      else 
       ThreadIdentifiers.Add(currentThreadId); 
     } 
    } 
} 

我已經通過創建圍繞concurrentQueue該通知什麼時候加一個包裝了這個工作然後我在我的特殊第三方組裝線程上處理這個事件,並從這個隊列中挑選請求,直到它被用盡......不知道爲什麼這是在另一種方式不行的時候工作?!

有沒有比我找到的更好的方法來處理來自多個不同線程在.NET核心中的單個線程上的多個請求?

+0

由於隊列是標準的實現消費者生產模式不是很奇怪ÿ你有工作......沒有辦法知道爲什麼「其他方式」不起作用,因爲沒有對「另一種方式」的明確解釋(「通過創建代表」不是一種)。如果你需要一個答案,你需要澄清你以「其他方式」嘗試了什麼,或者解釋你想以什麼方式改進隊列解決方案 - 在當前狀態下,帖子感覺過於寬泛。 –

+0

@Alexei當我發佈時,我即將走出工作 - 所以沒有時間獲得更多更詳細的帖子,我會在稍後獲得空閒時間時嘗試使用代碼示例進行擴展。 – Jay

+0

@AlexeiLevenkov我已經更新了一個代碼片段,它解釋了我遇到的問題。忘記其他我已經嘗試過的東西,現在問題說明了我的原始問題,一個回購,它歸結了我關於原始問題的代碼並解釋了我所做的工作。我的問題真的是 - 做這項工作的正確方法是什麼?我對併發隊列做的事情感覺很糟糕。 – Jay

回答

2

這很難說出你想從代碼中做什麼。但是,另一種方法是使用BlockingCollection<T>。後臺線程可能值添加到集合和一個單獨的線程可以坐下,並嘗試獲取值從集合中,例如:

while(continueProcessingCollection) 
{ 
    if(blockingCollection.TryTake(out value, TimeSpan.FromSeconds(myTimeout))) 
    { 
     // process value, decide to toggle continueProcessingCollection 
    } 
} 

後臺線程可以添加新值阻塞集合:

blockingCollection.Add(newValue); 

而且,地方你需要創建blockingCollection變量:

var blockingCollection = new BlockingCollection<MyType>(); 
+0

我剛剛在MSDN上看過這個,所以我的理解(現在)是,如果你調用TryTake(),它會阻塞,直到有東西被添加到集合中?那麼當有東西被添加時,它會解鎖並且代碼會繼續運行? – Jay

+0

@jay如果集合中有東西,它將返回true;如果超時,則返回false。你需要知道停止循環的條件(超時,處理的事物的數量等等)。但是,只要你的超時(我用我的例子1秒)就會阻塞。 –

+0

然後這正是我需要的 - 比我的hacky包裝併發隊列好得多。謝謝! – Jay

1

delegateBeginInvoke使用threadpool調度callback這不會解決你的問題。對於多個生產者和一個消費者來說,這基本上是一個Producer–consumer問題。 .NET核心包括some structures實現 IProducerConsumerCollection<T>你可以用來解決這個問題。

不要與Control.BeginInvoke(winform)和Dispatcher.BeginInvoke(WPF)混淆,它會在GUI線程上對回調進行排隊。

+0

我會看看這種模式,因爲我沒有意識到這有一個.net核心接口。謝謝,我會讓你知道我是如何得到的 – Jay