2017-08-29 116 views
0

我試圖建立一個無阻塞的UDP服務器,它偵聽不同的端口並接收數據包直到超時。不幸的是,我不能改變客戶端,UDP是強制性的。 接收文件正常工作。問題在於,創建工人是一項阻塞操作。我想讓它不被阻塞,所以所有的工作人員都是並行調用的。同樣,每個工人都應該像True一樣循環運行,但也是阻塞的。Python中的多線程套接字

這裏是我的代碼:

#!/usr/bin/env python 
from socket import * 
import sys 
import select 
import threading 
threads = [] 

def worker(port): 
     host="192.168.88.51" 
     s = socket(AF_INET,SOCK_DGRAM) 
     s.bind((host,port)) 
     addr = (host,port) 
     buf=128 
     data,addr = s.recvfrom(buf) 
     filename = str(port)+".data" 
     print str(port)+" received File:" 
     f = open(filename,'wb') 

     data, addr = s.recvfrom(buf) 
     try: 
      while(data): 
       f.write(data) 
       s.settimeout(1) 
       data,addr = s.recvfrom(buf) 
     except timeout: 
      f.close() 
      s.close() 
      print "File Downloaded" 

for i in range(1300,1305): 
    wrk = worker(i) 
    threads.append(wrk) 
+0

您需要創建一個對象來保存每個套接字連接。該對象將具有單獨的連接和偵聽方法。一旦你有了這個設置,你就可以在一個單獨的線程中運行每個對象的偵聽器方法。 – amicitas

回答

1

這將按照您的意圖工作,只是它會在每次發送新數據時都會覆蓋文件,而不會超時。 超時表示整個連接的結束。但是你可以很容易地重寫這個來將數據添加到相同的文件或創建一個新的文件或做任何你需要的。


#! /usr/bin/env python 
from socket import AF_INET, SOCK_DGRAM 
import socket 
import threading 

class Server (threading.Thread): 
    def __init__ (self, host="192.168.88.51", port=123, bufsize=128): 
     threading.Thread.__init__(self) 
     self.host = host 
     self.port = port 
     self.bufsize = bufsize 
     self.done = threading.Event() 

    def opensock (self): 
     s = socket.socket(AF_INET, SOCK_DGRAM) 
     s.bind((self.host, self.port)) 
     s.settimeout(0.001) 
     return s 

    def run (self): 
     host = self.host 
     port = self.port 
     self.s = s = self.opensock() 
     print "Waiting for connection on", host+":"+str(port) 
     while not self.done.isSet(): 
      try: 
       data, addr = s.recvfrom(self.bufsize) 
       print "Connection from", addr 
       s.settimeout(1) 
       self.recvdata(data, s, addr) 
       s.settimeout(0.001) 
      except socket.timeout: pass 
      except: 
       raise 
     self.done.set() 
     s.close() 
     print "Server on '%s:%s' stopped!" % (host, port) 

    def recvdata (self, initdata, conn, addr): 
     bufsize = self.bufsize 
     filename = str(self.port)+".data" 
     print "Opening file", filename 
     f = open(filename, "wb") 
     print "Receiving & writingrest of data from", addr 
     data = initdata 
     while data and not self.done.isSet(): 
      f.write(data) 
      try: 
       data, addr = conn.recvfrom(bufsize) 
      except socket.timeout: break 
     f.close() 
     if self.done.isSet(): 
      print "Forcefully interrupted transmission" 
     else: 
      print "File Downloaded" 

    def stop (self): 
     self.done.set() 
     self.s.close() 

servers = [] 
for port in xrange(123, 150): 
    try: 
     s = Server(port=port) 
     s.start() 
     servers.append(s) 
    except Exception as e: 
     print e 

raw_input("Press enter to send data to one of ports for testing . . . ") 
import random 
a = servers[0].host 
p = random.choice(servers).port 
print "data will be sent to port '%s:%i'" % (a, p) 
k = socket.socket(AF_INET, SOCK_DGRAM) 
k.connect((a, p)) 
k.send("1234567890") 
k.send("asdfghjkl") 
k.send("0987654321") 
k.close() 
raw_input("Press enter to close the program . . . ") 

# Stop all servers: 
for s in servers: 
    s.stop() 

# Make sure all of them terminated: 
for s in servers: 
    s.join() 

+0

哇!做得好。這就是它應該如何完成的。謝謝! – goetzmoritz

0

這做到了。自己想出來。

#!/usr/bin/env python 
from socket import * 
import sys 
import select 
import multiprocessing 

def worker(port): 
     print "started: "+str(port) 
     host="192.168.88.51" 
     s = socket(AF_INET,SOCK_DGRAM) 
     s.bind((host,port)) 
     addr = (host,port) 
     buf=128 
     data,addr = s.recvfrom(buf) 
     filename = str(port)+".jpg" 
     print str(port)+" received File:" 
     f = open(filename,'wb') 

     data, addr = s.recvfrom(buf) 
     try: 
      while(data): 
       f.write(data) 
       s.settimeout(1) 
       data,addr = s.recvfrom(buf) 
     except timeout: 
      f.close() 
      s.close() 
      print "File Downloaded" 

for i in range(1300,1305): 
    multiprocessing.Process(target=worker, args=(i,)).start() 
+0

雖然這可以解決您的問題,但這是非常糟糕的事情。也不完全是你Q的答案,因爲它使用多處理而不是多線程,這可能會導致一些不必要的複雜情況。您必須按照以上評論中的建議進行操作。對類threading.Thread()進行子類化,並將服務器放入其run()方法中。然後用不同的參數隨意創建Thread()。或者使用異步模塊來創建內核託管套接字,這些套接字可以在一個循環中一個接一個地進行處理,從而模擬線程。 – Dalen

+0

哦,順便說一句,threading.Thread()也可以以類似的方式用於multiprocessing.Process(),但是你應該通過繼承類並實現一個停止機制來真正做到這一點。如果在套接字打開時強行關閉程序,則以後可能無法連接到相同的端口。尤其在Windows上。 – Dalen

+0

你能爲你的解決方案提供一些代碼嗎?其實我還不明白... – goetzmoritz