2010-10-20 39 views
6

我有一個程序,做一個有限的多線程形式。它是用Delphi編寫的,並使用libmysql.dll(C API)訪問MySQL服務器。該程序必須處理一長串記錄,每記錄約0.1秒。把它看作一個大循環。所有的數據庫訪問都由工作線程完成,這些線程可以預取下一個記錄或寫入結果,因此主線程不必等待。如何才能使SQL查詢線程啓動,然後在獲取結果之前做其他工作?

在這個循環的頂部,我們首先等待預取線程,獲取結果,然後讓預取線程爲下一條記錄執行查詢。這個想法是預取線程將立即發送查詢,並在主線程完成循環時等待結果。

它經常這樣工作。但是請注意,沒有什麼可以確保預取線程立即運行。我發現通常這個查詢並沒有被髮送,直到主線程循環並開始等待預取。

我通過在啓動預取線程後立即調用sleep(0)來解決這個問題。通過這種方式,主線程放棄了剩餘的時間片,希望預取線程現在可以運行,併發送查詢。然後線程在等待時會睡眠,這就允許主線程再次運行。
當然,在操作系統中運行的線程還有很多,但這在一定程度上確實有效。

我真正想要發生的是主線程發送查詢,然後讓工作線程等待結果。我使用libmysql.dll在工作線程中調用

result := mysql_query(p.SqlCon,pChar(p.query)); 

。相反,我想讓主線程調用類似於

mysql_threadedquery(p.SqlCon,pChar(p.query),thread); 

這會在數據發佈後立即切換任務。

有人知道這樣的事嗎?

這實際上是一個調度問題,所以我可以嘗試以更高的優先級開啓預取線程,然後在發送查詢後降低它的優先級。但是,再次,我沒有任何mysql調用分開發送查詢和接收結果。

也許它在那裏,我只是不知道它。請賜教。

問題補充:

有誰覺得這個問題會通過更高的優先級比主線程中運行的預取線程來解決?這個想法是,預取將立即搶佔主線程併發送查詢。然後它會睡覺等待服務器回覆。同時主線程將運行。

新增:當前實現

這個程序執行包含在一個MySQL數據庫的數據進行計算的詳細信息。有33M項目每秒增加更多。程序不斷運行,處理新項目,有時重新分析舊項目。它從表格中獲得要分析的項目列表,因此在通過(當前項目)的開始處,它知道它將需要的下一個項目ID。

由於每個項目都是獨立的,因此這是多處理的理想目標。最簡單的方法是在多臺機器上運行該程序的多個實例。該程序通過分析,重寫和算法重新設計高度優化。當數據不足時,單個實例仍然使用100%的CPU內核。我在兩個四核工作站上運行4-8份拷貝。但以這樣的速度,他們必須花時間在MySQL服務器上等待。 (優化服務器/數據庫模式是另一個主題。)

我在此過程中實現了多線程,目的只是爲了避免阻塞SQL調用。這就是我稱之爲「有限多線程」的原因。工作者線程有一項任務:發送命令並等待結果。 (OK,兩項任務。)

原來有6個阻塞任務與6個表相關聯。其中兩個讀取數據和另外4個寫入結果。這些足夠相似,可以由一個通用的任務結構來定義。一個指向這個Task的指針被傳遞給一個線程池管理器,它分配一個線程來完成這項工作。主線程可以通過任務結構檢查任務狀態。

這使得主線程代碼非常簡單。當它需要執行Task1時,它會等待Task1不忙,將SQL命令放入Task1並將其傳遞。當Task1不再忙時,它包含結果(如果有的話)。

寫入結果的4個任務是微不足道的。主線程有一個任務寫入記錄,當它進入下一個項目時。完成該項目後,確保先前的寫入在完成之前完成。

2個閱讀線程並不重要。將讀取傳遞給一個線程,然後等待結果,將不會獲得任何結果。相反,這些任務預取下一個項目的數據。所以來到這個阻塞任務的主線程檢查預取是否完成;如果需要,等待預取完成,然後從任務中獲取數據。最後,它用NEXT Item ID重新發布任務。

這個想法是爲預取任務立即發出查詢並等待MySQL服務器。然後主線程可以處理當前項目,並且在下一個項目開始時,它需要的數據位於預取任務中。

所以線程,線程池,同步,數據結構等都完成了。所有的作品。我剩下的是一個調度問題。

調度問題是這樣的:所有的速度增益都在處理當前項目,而服務器正在獲取下一個項目。我們在處理當前項目之前發出預取任務,但我們如何保證它開始?操作系統調度程序不知道預取任務對於立即發出查詢很重要,然後它只會等待。

操作系統調度程序試圖「公平」並允許每個任務運行指定的時間片。我最糟糕的情況是這樣的:主線程收到它的片併發出預取,然後完成當前項目並且必須等待下一個項目。等待釋放剩下的時間片,所以調度器啓動預取線程,該線程發出查詢然後等待。現在兩個線程都在等待。當服務器發出查詢完成信號時,預取線程重新啓動,並請求結果(數據集)然後休眠。當服務器提供預取線程喚醒的結果時,標記任務完成並終止。最後,主線程重新啓動並從完成的任務中獲取數據。

爲了避免這種最壞情況的調度,我需要一些方法來確保預取查詢是在主線程繼續使用當前項目之前發出的。到目前爲止,我認爲有三種方式來做到這一點:在發出預取任務後,右鍵

  1. ,主線程調用Sleep(0)。這應該放棄其餘的時間片。我然後希望調度程序運行預取線程,它將發出查詢,然後等待。然後調度程序應該重新啓動主線程(我希望)。儘管它聽起來很糟糕,但實際上它比沒有更好。

  2. 我可能會發出預取線程比主線程更高的優先級。這應該會導致調度程序立即運行它,即使它必須搶佔主線程。它也可能有不良影響。後臺工作者線程獲得更高的優先級似乎是不自然的。

  3. 我可能異步發出查詢。也就是說,單獨從接收結果發送查詢。這樣我可以讓主線程使用mysql_send_query(非阻塞)發送預取並繼續使用當前項目。然後,當它需要下一個項目時,它會調用mysql_read_query,這會阻塞,直到數據可用。

請注意,解決方案3甚至不使用工作線程。這看起來像是最好的答案,但需要重寫一些底層代碼。我目前正在尋找這種異步客戶端 - 服務器訪問的例子。

我也想就這些方法提供任何有經驗的意見。我錯過了什麼,或者我做錯了什麼?請注意,這是所有工作代碼。我不是在問怎麼做,而是如何做得更好/更快。

+0

縱觀mysql.pas包裝,我發現了兩個函數mysql_send_query和mysql_read_query,聽起來像我需要的東西。谷歌然後讓我去http://jan.kneschke.de/2008/9/9/async-mysql-queries-with-c-api/誰寫道:「...是公開的,但沒有記錄。不阻止我們。「這看起來很有希望,但我仍然可以使用關於如何正確操作的建議。 – 2010-10-20 05:56:25

+1

通常你有一個處理查詢的線程,不是發送查詢,而是另一個查詢結果。 – 2010-10-20 08:01:45

+0

是的,我現在就這樣做。我有一個預取線程發送查詢來選擇下一條記錄。理論上,我應該在線程等待結果時處理當前記錄。在實踐中,不能保證預取線程馬上啓動。 – 2010-10-21 01:31:19

回答

0

你只需要使用Delphi線程的標準線程同步機制。

檢查您的IDE幫助的TEvent類及其關聯的方法。

1

我不知道任何允許這個的數據庫訪問層。

原因是每個線程都有自己的「thread local storage」(Delphi中的threadvar關鍵字,其他語言有等價物,它在很多框架中使用)。
當你在一個線程上啓動一些東西並在另一個線程上繼續時,那麼你會將這些本地存儲混合在一起造成各種破壞。

你能做的最好的是這樣的:

  1. 通過查詢和參數的線程將處理這(使用標準的德爾福線程同步機制,這一點)
  2. 有實際的查詢線程中執行查詢
  3. 結果主線程(使用標準的德爾福線程同步機制,這一點)

的答案返回this question更詳細地解釋了線程同步。

編輯:(上開始在其他線程的東西的推測緩慢)

「向右走」是一個相對的概念:這取決於你如何做你的線程同步,並可以非常非常快(即少於一毫秒)。
創建新線程可能需要一些時間。
解決方案是使工作線程的線程池足夠大,以高效的方式處理合理數量的請求。
這樣,如果系統還沒有太忙,你將有一個工作線程準備立即開始爲你的請求提供服務。

我在一個需要低延遲響應的大型音頻應用程序中完成了這項工作(甚至是跨進程),並且它的工作原理類似於魅力。
音頻服務器進程以高優先級等待請求運行。當它閒置時,它不會消耗CPU,但當它收到請求時,它的響應速度非常快。

this question on changes with big improvementsthis question on cross thread communication的回答提供了一些有關如何獲得此異步行爲的有趣提示。
查找單詞AsyncCalls,OmniThreadthread

--jeroen

+0

這就是我在幾個版本之前完成的。它太慢了。在任何情況下,它都不能解決問題 - 這是爲了確保線程立即發送查詢。 – 2010-10-21 01:26:59

+0

@Guy:看我的編輯。 – 2010-10-21 06:56:54

+0

感謝您的澄清。我將通過編輯上面的問題來擴展我對現在如何做的解釋。 (這些評論框很小。)歡迎評論和/或批評。 – 2010-10-24 15:56:00

1

我把在第二個答案,你的問題的第二部分:您調度問題 這使得更容易區分兩個答案。

首先,您應該閱讀Consequences of the scheduling algorithm: Sleeping doesn't always help這是Raymond Chen的博客「The Old New Thing」的一部分。
Sleeping versus polling也是很好的閱讀。
基本上all these做好閱讀。

如果我正確理解你的調度問題,你有3種線程:

  1. 主線:可以確保獲取線程總是有工作要做
  2. 提取主題:(數據庫綁定)獲取數據用於處理線程
  3. 處理線程:(CPU結合的)處理獲取的數據

保持3運行的唯一方法是具有2取儘可能多的數據,因爲他們可以。
保持2次提取的唯一方法是讓1提供足夠的條目以提取。

您可以使用隊列1和2之間以及2和3之間

你現在的問題是雙重的傳遞數據:

  • 找到線程的數量之間的平衡在第2類和3
  • 確保2總是有工作要做

我想你已經解決了前者。
後者歸結爲確保1和2之間的隊列永遠不會爲空。

一些技巧:

  • 您可以使用睡眠(1)(請參閱博客文章)作爲一個簡單的方法來「強制」 2運行
  • 不要讓胎面退出其執行:創建和銷燬線程是昂貴
  • 選擇您的同步對象(通常稱爲IPC對象)仔細(Kudzu對他們nice article

--jeroen

+0

謝謝Jeroen。這是我正在尋找的。閱讀所有這些鏈接需要一段時間。正如你所看到的,我現在對我的做法並不滿意。但是該程序啓動了單線程,並添加了簡單的工作線程,就像我給它提供了巨大的性能提升,而沒有太多複雜性。在單個進程中使用隊列和線程進行全面重寫可能是時候了。在主流程中有更多的並行性被利用。我現在所做的不會擴展到更多核心。 – 2010-10-25 22:46:21

+0

謝謝Jeroen。這是我正在尋找的。閱讀所有這些鏈接需要一段時間。正如你所看到的,我現在對我的做法並不滿意。但是該程序啓動了單線程,並添加了簡單的工作線程,就像我給它提供了巨大的性能提升,而沒有太多複雜性。在單個進程中使用隊列和線程進行全面重寫可能是時候了。在主流程中有更多的並行性被利用。我現在所做的不會擴展到更多核心。 – 2010-10-25 22:46:42

+0

@Guy:我很高興我能幫上忙。可惜,很少有人花時間閱讀你的優秀問題,因爲它應該已經被提高了。也許你應該改變問題的標題,更多地關注「調度」部分。 – 2010-10-26 07:43:41

4

不過,單個實例在沒有數據缺乏時使用100%的CPU內核。我在兩個四核工作站上運行4-8份拷貝。

我在這裏有一個概念上的問題。在你的情況下,我會創建一個多進程解決方案,每個進程都在單線程中完成所有任務,或者我將創建一個多線程解決方案,僅限於任何特定機器上的單個實例。一旦你決定使用多個線程並接受增加的難以解決的錯誤的複雜性和可能性,那麼你應該最大限度地利用它們。使用具有多個線程的單個進程允許您使用不同數量的線程來讀取和寫入數據庫並處理數據。線程的數量甚至可能會在程序的運行時間內發生變化,並且數據庫和處理線程的比率也可能會變化。只有在程序中的一個點上控制所有線程時,才能對這種工作進行動態分區,而這對多個進程來說是不可能的。

我在這個過程中實現了多線程,只是爲了避免阻塞SQL調用。

對於多個進程,不會有真正的需要這樣做。如果你的進程是I/O限制的,有些時候他們不佔用CPU資源,所以你可能只需要運行更多的進程而不是你的機器有內核。但是,如果機器還有其他工作,那麼你就會知道需要產生多少個進程,並且這個進程可能會隨着時間的推移而再次發生變化。一個過程中的線程解決方案可以以相對簡單的方式適應不斷變化的環境。

所以線程,線程池,同步,數據結構等都完成了。所有的作品。我剩下的是一個調度問題。

你應該留給操作系統。只需要一個具有必要池線程的進程。類似如下:

  • 多個線程從數據庫中讀取記錄並將它們添加到一個生產者 - 消費者隊列與一個上限,這是某處Ñ2 * N之間其中N是系統中處理器內核的數量。這些線程將阻塞整個隊列,並且它們可以具有更高的優先級,以便當隊列有更多空間並且它們變得暢通時它們將被安排運行。由於它們在大多數情況下會在I/O上被阻塞,因此它們的優先級應該不會成爲問題。
    我不知道這個線程數是多少,你需要測量。

  • 大量處理線程,可能是系統中每個處理器核心的一個。他們將從上一點提到的隊列中取得工作項目,如果該隊列是空的,則在該隊列上阻止工作項目。已處理的工作項目應該轉到另一個隊列。

  • 大量線程從第二個隊列中處理工作項並將數據寫回數據庫。對於第二個隊列也應該有一個上限,以便將未處理的數據寫回數據庫不會導致已處理的數據堆積並填滿所有進程內存空間。

線程數量需要確定,但所有調度都將由OS調度程序執行。關鍵是要有足夠的線程來利用所有的CPU核心以及必要數量的輔助線程來使它們忙碌並處理它們的輸出。如果這些線程來自池,您可以在運行時自由調整它們的數量。

Omni Thread Library有一個解決方案的任務,任務池,生產者消費者隊列和你需要實現這一切。否則,您可以使用互斥鎖編寫自己的隊列。

調度問題是這樣的:所有的速度增益都在處理當前項目,而服務器正在獲取下一個項目。我們在處理當前項目之前發出預取任務,但我們如何保證它開始?

通過給予它更高的優先級。

OS調度器不知道預取任務發出查詢馬上

它就會知道,如果線程具有更高的優先級是很重要的。

OS調度程序試圖保持「公平」並允許每個任務運行指定的時間片。

僅限具有相同優先級的線程。沒有較低優先級的線程將獲得任何CPU片段,而同一進程中的較高優先級線程可以運行。
[編輯:這不完全正確,更多的信息在最後。但是,它是足夠接近真相,以確保較高優先級網絡線程發送和儘快接收數據。]

  1. 權限發行預取任務後,主線程調用Sleep(0 )。

調用Sleep()是一個糟糕的方式來強制線程按照一定的順序來執行。根據它們執行的工作的優先級設置線程優先級,並且如果它們不應該運行,則使用OS基元來阻塞更高優先級的線程。

我可能會發出比主線程更高優先級的預取線程。這應該會導致調度程序立即運行它,即使它必須搶佔主線程。它也可能有不良影響。後臺工作者線程獲得更高的優先級似乎是不自然的。

這沒什麼不自然的。這是使用線程的預期方式。您只需確保較高優先級的線程遲早會阻塞,任何進入I/O(文件或網絡)的OS線程都會阻塞。在我所描述的高優先級線程之上的方案也會阻塞隊列。

我可能異步發出查詢。

我不會去那裏。當你爲許多同時連接編寫服務器並且每個連接的線程過於昂貴時,這種技術可能是必需的,但是否則阻止線程化解決方案中的網絡訪問應該可以正常工作。

編輯:

感謝的Jeroen Pluimers的捅到仔細看這個。由於他在評論中給出的鏈接中的信息顯示我的聲明

沒有較低優先級的線程將獲得任何CPU片,而同一進程中的較高優先級線程可運行。

是不正確的。即使優先級較高的線程可運行,長時間未運行的優先級較低的線程也會獲得隨機優先級提升,並確實遲早會獲得CPU的份額。有關詳細信息,請參閱"Priority Inversion and Windows NT Scheduler"

爲了測試這一點,我創建了一個簡單的演示用Delphi:

type 
    TForm1 = class(TForm) 
    Label1: TLabel; 
    Label2: TLabel; 
    Label3: TLabel; 
    Label4: TLabel; 
    Label5: TLabel; 
    Label6: TLabel; 
    Timer1: TTimer; 
    procedure FormCreate(Sender: TObject); 
    procedure FormDestroy(Sender: TObject); 
    procedure Timer1Timer(Sender: TObject); 
    private 
    fLoopCounters: array[0..5] of LongWord; 
    fThreads: array[0..5] of TThread; 
    end; 

var 
    Form1: TForm1; 

implementation 

{$R *.DFM} 

// TTestThread 

type 
    TTestThread = class(TThread) 
    private 
    fLoopCounterPtr: PLongWord; 
    protected 
    procedure Execute; override; 
    public 
    constructor Create(ALowerPriority: boolean; ALoopCounterPtr: PLongWord); 
    end; 

constructor TTestThread.Create(ALowerPriority: boolean; 
    ALoopCounterPtr: PLongWord); 
begin 
    inherited Create(True); 
    if ALowerPriority then 
    Priority := tpLower; 
    fLoopCounterPtr := ALoopCounterPtr; 
    Resume; 
end; 

procedure TTestThread.Execute; 
begin 
    while not Terminated do 
    InterlockedIncrement(PInteger(fLoopCounterPtr)^); 
end; 

// TForm1 

procedure TForm1.FormCreate(Sender: TObject); 
var 
    i: integer; 
begin 
    for i := Low(fThreads) to High(fThreads) do 
// fThreads[i] := TTestThread.Create(True, @fLoopCounters[i]); 
    fThreads[i] := TTestThread.Create(i >= 4, @fLoopCounters[i]); 
end; 

procedure TForm1.FormDestroy(Sender: TObject); 
var 
    i: integer; 
begin 
    for i := Low(fThreads) to High(fThreads) do begin 
    if fThreads[i] <> nil then 
     fThreads[i].Terminate; 
    end; 
    for i := Low(fThreads) to High(fThreads) do 
    fThreads[i].Free; 
end; 

procedure TForm1.Timer1Timer(Sender: TObject); 
begin 
    Label1.Caption := IntToStr(fLoopCounters[0]); 
    Label2.Caption := IntToStr(fLoopCounters[1]); 
    Label3.Caption := IntToStr(fLoopCounters[2]); 
    Label4.Caption := IntToStr(fLoopCounters[3]); 
    Label5.Caption := IntToStr(fLoopCounters[4]); 
    Label6.Caption := IntToStr(fLoopCounters[5]); 
end; 

這創建(我的4芯的機器上)6個線程,或者所有具有較低優先級,或4與正常和2具有較低優先級。在第一種情況下,所有6個線程運行,但與CPU時間完全不同的股:

6 threads with lower priority

在第二種情況下4個線程與CPU時間大致相等份額運行,但其它兩個線程得到一個小CPU的佔有率,以及:

4 threads with normal, 2 threads with lower priority

但CPU時間的份額,是非常非常小,對其他線程收到什麼百分比如下方式。

回到您的問題:使用具有自定義優先級的多個線程的程序,通過生產者 - 消費者隊列耦合,應該是一個可行的解決方案。在正常情況下,數據庫線程會在網絡操作或隊列上阻塞大部分時間。而Windows調度程序將確保即使是較低優先級的線程也不會完全餓死。

+0

'優先級較低的線程將獲得任何CPU片,而同一進程中較高優先級的線程可以運行。我不確定是否屬實,請參閱:http://blogs.msdn.com/b/oldnewthing/archive/2005/10/03/476413.aspx +1。 – 2010-10-25 07:35:23

+0

@Jeroen:正在運行的線程表示當前沒有運行。因此,如果所有更高優先級的線程正在運行或被阻塞,則空閒內核將只運行較低優先級的線程。我應該相應地編輯我的答案嗎? – mghie 2010-10-25 07:53:28

+0

@mghie:請做;也請研究這是否確實會發生。我似乎還記得,即使有高優先級的線程在運行,低優先級線程也可以獲得一些CPU時間。還有像「動態優先」和「關鍵部分」。 Windows線程調度器是一個複雜的野獸;它使很多有趣的閱讀,例如:http://stackoverflow.com/questions/656959/win32-thread-scheduling,http://support.microsoft.com/kb/96418和http:// msdn。 microsoft.com/en-us/library/ms684831(VS.85).aspx – 2010-10-25 13:42:38

相關問題