2013-05-07 44 views
3

我在寫入IO緩衝區和套接字之間傳輸數據的線程時出現問題。我沒有任何問題讓它運行,但不是我想要的方式。下面是代碼草圖:沒有輪詢的基於選擇的套接字循環

s = socket(...) # some connection 
in_buffer = b'' # consumed by other thread 
out_buffer = b'' # produced by other thread 
while True: 
    (r, w, x) = select([s], [s], [s]) 
    if r: 
     in_buffer += s.recv(RECV_LIMIT) 
    if w: 
     sent = s.send(out_buffer) 
     out_buffer = out_buffer[sent:] 
    if x: 
     break 

問題在於它閒置時會佔用一個完整的CPU。原因是套接字大部分時間都是可寫的,特別是閒置時。立即select()回報,什麼也不做,再次呼籲select(),什麼都不做等有一個簡單的修復,不檢查一個可寫的插座,當你沒有什麼寫:

... # dito 
while True: 
    if out_buffer: 
     (r, w, x) = select([s], [s], [s]) 
    else: 
     (r, w, x) = select([s], [], [s]) 
    ... # dito 

這工作,但它有一個不同的問題:空閒時,這個塊無限制地在select()上。如果我添加一些東西到輸出緩衝區,我需要以某種方式喚醒來自accept()調用的線程,但是如何?爲了記錄,我當前的解決方法稍微改變了評估:

while True: 
    (r, w, x) = select([s], [s], [s]) 
    if x: 
     break 
    elif r: 
     in_buffer += s.recv(RECV_LIMIT) 
    elif w: 
     if out_buffer: 
      sent = s.send(out_buffer) 
      out_buffer = out_buffer[sent:] 
     else: 
      sleep(0.001) 

總之,當確實沒有什麼可做的時候,插入延遲。毫秒足以甚至不會消耗1%的CPU。類似的方法是使用select()調用的超時,然後重新檢查輸出數據的存在。儘管如此,這兩種解決方案都不是很好,因爲兩者都有效地歸結爲投票和投票。那麼,我該如何編寫一個IO線程,像這樣可移植並且無需輪詢?

注意:一種方法是添加另一個文件描述符,在該文件描述符上創建人造流量以便從阻塞select()調用中喚醒線程。在這裏,問題是select()只能在插座上移植使用,而不是例如。管道。或者,在MS Windows上,我可以將一個win32事件與一個套接字的狀態更改和另一個事件關聯起來(請參閱WSAEventSelect),但我不想將此代碼寫入非移植式WinSock API的頂層。

+0

發送字節上的一個套接字喚醒線程技巧工程在我的代碼罰款。如果您使用TCP或UDP套接字而不是管道,則也可以在Windows下執行此操作。 – 2013-05-07 21:26:24

+0

你建議創建一對loopback套接字來喚醒線程。是的,它肯定有效,毫無疑問。我很確定它涉及將數據從一個進程編組到內核的IP棧並回到相同的進程,這可能甚至比我目前的黑客還要慢。 – 2013-05-07 21:58:06

+1

我只能說我在我的(相當性能苛刻的,基於C++的)程序中使用了這個技術,我從來沒有注意到任何放緩因爲它。我認爲通過環回設備發送數據的數據路徑已經很好地優化了,並且在任何情況下,您只需要在每次喚醒時發送一個字節。鑑於你的代碼無論如何都在Python解釋器中運行,我不認爲你會注意到任何放緩。 (但請務必關閉Nagle的算法) – 2013-05-07 22:47:40

回答

1

這有點不清楚,爲什麼你需要讓這位中間人在這裏與select一起工作 - 這是你的問題的一個約束?在我看來,如果是這樣,那麼您必須將輸出緩衝區視爲需要準備好閱讀的資源,然後才能告訴您選擇了您有興趣編寫的內容。

看起來像這樣會大大簡化,如果你切換你的緩衝區的小字符串Queues傳遞。通過這種方式,你可以有兩個線程,與插座交互:

# One Thread consuming the socket 
while True: 
    (r, w, x) = select([s], [], [s]) 
    if r: 
     in_buffer.put(s.recv(RECV_LIMIT)) 
    if x: 
     break 

# And one Thread writing to the socket 
while True: 
    string = out_buffer.get() 
    (r, w, x) = select([], [s], [s]) 
    if w: 
     s.send(string) 
    if x: 
     break 

這樣的生產線可以安全地發出信號數據的準備,甚至被寫入。也就是說,select是一個非常低級的界面(就像socket那樣),我會考慮使用知道更多花裏胡哨的抽象。我偏好gevent,但它當然適用於IO綁定的應用程序,如果您受CPU限制,可能不太適合。在那裏生產者和消費者可以直接有效地與插座交互,從而不需要這個中間人:

import gevent 
from gevent import socket, sleep 

def producer(sock): 
    # We'll spit out some bytes every so often 
    while True: 
     sock.send('Hello from the producer!') 
     sleep(0.01) 

def consumer(sock): 
    # We'll read some in as long as we can 
    buffer = '' 
    while True: 
     buffer += sock.recv(100) 
     # If the buffer can be consumed, we'll consume it and reset 
     if len(buffer) > 500: 
      print 'Consuming buffer: %s' % buffer 
      buffer = '' 

def client(sock): 
    # This will emulate a client that prints what it recieves, but always 
    # sends the same message 
    while True: 
     sock.send('Hello from the client!') 
     print sock.recv(100) 

# Run this to get the server going 
listener = socket.socket() 
listener.bind(('127.0.0.1', 5001)) 
listener.listen(5) 
(sock, addr) = listener.accept() 
gevent.joinall([ 
    gevent.spawn(producer, sock), 
    gevent.spawn(consumer, sock) 
]) 

# Run this to get a client going 
connector = socket.socket() 
connector.connect(('127.0.0.1', 5001)) 
gevent.joinall([ 
    gevent.spawn(client, connector) 
]) 
+0

我認爲你在這裏提出一個很好的觀點,轉向更高層次的框架。我將不得不研究一下gevent,看看它的作用。不幸的是,它還不支持Python 3,儘管支持正在進行中。另一個我想知道的問題是,在兩個select()中使用相同的套接字是否能夠正常工作,我還沒有找到明確的答案。最後,它仍然困擾着我,似乎沒有一個乾淨的解決方案,使用Python自帶的工具。 – 2013-05-08 06:48:21

+1

啊,這是關於Python 3的一個好處。似乎並不明確禁止從兩個線程調用select,但最好在一個線程中進行異常檢查,但不能同時檢查這兩個線程。這似乎簡化了一個監聽線程。也就是說,在我看來,你總是在等待兩個事件:產生的數據和套接字上可用的數據。如果使用線程,對我來說這意味着兩個線程,每個線程監聽一個線程。我認爲第一個代碼片段在給出問題時是相當乾淨的,並且它只使用python核心模塊:-) – 2013-05-08 15:03:14

+0

當使用兩個線程時,仍然存在我在兩個線程中使用相同套接字的問題,即使只有一個而另一個只是在閱讀。此外,當我想關閉連接時,我仍然需要一種方法來喚醒這兩者。對於一個寫作來說,將信號值放入隊列很容易。在accept中等待的人不是,關閉套接字來喚醒線程再次是來自多個線程的使用,也是有問題的。也就是說,我不同意你的看法,兩件事並不意味着給我兩條線索。我寧願說服務一個資源(套接字)意味着一個線程。因人而異。 – 2013-05-13 20:38:36