2016-12-18 11 views
1

我嘗試測試一個網絡應用程序。但是我可能會在腦中產生一個結。 據我所知,minitest中的測試並行運行。在此假設,我認爲很明顯的是,當我分配在設置一個端口()失敗時,幾個測試運行:如何在Minitest中爲TCPSocket寫幾個測試

RuntimeError: Address already in use - bind(2) for nil port 2000 TCPServer new failed 

那麼,什麼是做監聽在一臺服務器上多次測試的最佳實踐港口?

class ServerTest < Minitest::Test 

     def setup 
     # -- set env variable 
     ENV["conf"] = "./tc_gpio.config" 
     Thread::abort_on_exception = true 
     @my_thr = Thread.start() do 
      @server = Server.new  
      @server.start 
      assert @server.hi() != nil 
     end 
     end 


     def teardown 
     Thread.kill(@my_thr) # sends exit() to thr 
     end 

     def test_connect_and_disconnect 
     sleep 1 
     hostname = 'localhost' 
     port = 2000 
     s = TCPSocket.open(hostname, port) 
     my_s = s.recvmsg() 
     s.sendmsg(:set.to_s, 0) # Failes since a serialized object is expected 
     my_s2 = s.recvmsg() 

     assert_equal( "READY" , my_s[0]) 
     assert_equal("eeFIN" , my_s2[0]) 
     end 

     def test_send_command 

     # fill command 
     com = Command.new 
     com.type = :set 
     com.device_name = 'pump0' 
     com.device_address = 'resource0' 
     com.value = 2 

     serial_com = YAML::dump(com) 

     sleep 1 
     hostname = 'localhost' 
     port = 2000 
     s = TCPSocket.open(hostname, port) 
     my_s = s.recvmsg() 
     s.sendmsg(serial_com, 0) 
     my_s2 = s.recvmsg() 


     assert_equal( "READY" , my_s[0]) 
     assert_equal("FIN" , my_s2[0]) 
     end 
    end 

回答

0

當並行測試TCP服務器時,服務器的每個實例都應該啓動一個獨立的端口。這可以通過在創建套接字時指定端口號0來完成。當端口號0給出,套接字將被綁定到一個隨機的未使用的端口:

interface = "0.0.0.0" 
port = 0 
tcp_server = TCPServer.new(interface, port) 

你可以找出哪個端口的TCP服務器套接字被綁定到:

bound_port = @server_socket.addr[1] 

一種方法使用這些事實是有一個服務器是這樣的:

class Server 

    # Create a server instance. If the port is unspecified, 
    # or 0, then a random ephemeral port is bound to. 
    def initialize(interface: "127.0.0.1", port: 0) 
    @server_socket = TCPServer.new(interface, port) 
    ... 
    end 

    # Return the port the server socket is bound to. 
    def bound_port 
    @server_socket.addr[1] 
    end 

    ... 

end 

測試,然後創建使用端口0服務器實例:

server = Server.new(port: 0) 

向服務器的連接時,該測試使用#bound_port訪問,找出連接到哪個端口:

client = TCPSocket.open("localhost", server.bound_port) 

,然後正常進行。