2016-01-22 102 views
4

阻止來自com.datastax.driver.core.Session這種方法卡桑德拉如何處理阻斷datastax Java驅動程序

public ResultSet execute(Statement statement); 

評論執行fethod執行語句:

此方法一直阻塞至少從 數據庫收到了一些結果。但是,對於SELECT查詢,它並不保證 結果已被完整接收。但它確實保證從數據庫接收到一些 響應,特別是 保證如果請求無效,則通過此方法將拋出異常 。

非阻塞從com.datastax.driver.core.Session

public ResultSetFuture executeAsync(Statement statement); 

此方法不會阻止執行fethod。只要查詢已經傳送到底層網絡堆棧,它就會返回 。特別是,從 返回時,此方法不保證查詢有效或甚至已將 提交給活動節點。在訪問{@link ResultSetFuture}時,任何與查詢失敗 有關的異常都將被拋出。

我有關於他們的02個問題,因此,如果你能幫助我理解他們,那將是非常好的。

比方說,我有100萬條記錄,我希望所有這些記錄都到達數據庫(沒有丟失)。

問題1:如果我有線程數爲n,所有的線程將有他們需要發送到數據庫中記錄的相同金額。他們都使用阻止執行調用繼續向cassandra發送多個插入查詢。如果我增加n的值,它是否也有助於加快我需要將所有記錄插入cassandra的時間?

這會導致cassandra性能問題嗎? Cassandra是否必須確保對於每個插入記錄,羣集中的所有節點都應該立即知道新記錄?爲了保持數據的一致性。 (我假設cassandra節點甚至不會考慮使用本地機器時間來控制記錄插入時間)。

問題2:使用非阻塞執行,我如何確保所有的插入操作都成功?我知道的唯一方法是等待ResultSetFuture檢查插入查詢的執行情況。有沒有更好的辦法可以做到?非阻塞執行更容易失敗,然後阻止執行嗎?

非常感謝您的幫助。

回答

5

如果我有n個線程,所有線程將有相同數量的記錄,他們需要發送到數據庫。他們都使用阻止執行調用繼續向cassandra發送多個插入查詢。如果我增加n的值,它是否也有助於加快我需要將所有記錄插入cassandra的時間?

在某種程度上。讓我們稍微離開客戶端實現細節,並從「併發請求數」的角度來看待事物,因爲如果使用executeAsync,則不需要爲每個正在進行的請求設置線程。在我的測試中,我發現雖然併發請求數量很大,但有一個閾值,即收益遞減或性能開始降低。我的一般經驗法則是(number of Nodes *native_transport_max_threads (default: 128)* 2),但您可能會發現更多或更少的更優化的結果。

這裏的想法是,在排隊更多的請求方面沒有什麼價值超過cassandra一次可以處理的。在減少進入請求的次數的同時,可以限制驅動程序客戶端與cassandra之間的連接不必要的擁塞。

問題2:在非阻塞執行的情況下,如何確保所有插入操作都成功?我知道的唯一方法是等待ResultSetFuture檢查插入查詢的執行情況。有沒有更好的辦法可以做到?非阻塞執行更容易失敗,然後阻止執行嗎?

通過get等待ResultSetFuture是一個路由,但是如果您正在開發完全異步應用程序,則希望儘可能避免阻塞。使用番石榴,你的兩個最好的武器是Futures.addCallbackFutures.transform

  • Futures.addCallback允許您註冊當驅動器接收到的響應是被執行的FutureCallbackonSuccess在成功案例中得到執行,否則onFailure

  • Futures.transform允許您將返回的ResultSetFuture有效映射到其他內容中。例如,如果您只需要1列的值,則可以使用它將ListenableFuture<ResultSet>轉換爲ListenableFuture<String>,而無需在代碼中阻止ResultSetFuture,然後獲取字符串值。

在寫的DataLoader程序的情況下,你可以這樣做以下:

  1. 爲了簡單起見,使用Semaphore或一些其他結構具有固定的許可數(將成爲您的機上請求的最大數量)。無論何時您使用executeAsync提交查詢,都需要獲得許可證。您應該只需要1個線程(但可能需要引入一個#cpu內核大小的池)來從Semaphore獲取許可並執行查詢。它會阻止收購,直到有一個可用的許可證。
  2. 使用Futures.addCallback爲將來從executeAsync返回。在onSuccessonFailure兩種情況下,回調應呼叫Sempahore.release()。通過釋放許可證,這應該允許您的步驟1中的線程繼續並提交下一個請求。

爲了進一步提高吞吐量,您可能需要考慮使用BatchStatement並批量提交請求。如果將批次保持較小(50-250是一個好數字),並且批次中的插入共享相同的分區密鑰,則這是一個不錯的選擇。

+0

我沒有在'nodes * native_transport_max_threads'位上出售。特別是,推理(沒有太多的價值排隊更多的請求比卡桑德拉將一次處理)假設旅行時間是即時/微不足道。如果我的客戶端和cassandra節點之間的單程時間爲100ms,並且服務器可以在2ms內處理請求,那麼我希望一次將電線放在〜50。這裏的想法是,我現在連線的人會在大約100ms內到達,在那段時間內,服務器可以處理大約50條消息,並且我想讓服務器保持繁忙狀態,並始終確保它已經工作 –