2017-08-10 74 views
0

多個連接考慮,你需要保持與只是ocassionally發送命令的設備256個的TCP連接的情況。我想同時做到這一點(它需要阻止,直到它得到響應),我試圖使用QThreadPool的這個目的,但我有一些懷疑,如果這是可能的。處理使用QThreadPool

我試圖用QRunnable,但我不知道插座將如何線程之間的行爲(即他們在創建插座應只用於線程?)

我也擔心這個解決方案的效率如果有人能夠提出一些替代方案,而不一定使用QT,我會很高興。

下面我張貼的代碼的某些代碼段。

class Task : public QRunnable { 

    Task(){ 
     //creating TaskSubclass instance and socket in it 
    } 

private: 
    TaskSubclass    *sub; 

    void run() override { 
     //some debug info and variable setting... 
     sub->doSomething(args); 
     return; 
    } 
}; 

class TaskSubclass { 
    Socket   *sock;   // socket instance 
    //... 
    void doSomething(args) 
    { 
     //writing to socket here 
    } 
} 

class MainProgram : public QObject{ 
    Q_OBJECT 
private: 
    QThreadPool *pool; 
    Task *tasks; 

public: 
    MainProgram(){ 
     pool = new QThreadPool(this); 
     //create tasks here 
    } 

    void run(){ 
     //decide which task to start 
     pool->start(tasks[i]); 
    } 
}; 
+1

請問爲什麼你要使用多線程的解決方案? Qt的事件系統能夠通過一個線程處理數百個Tcp連接,而不會出現麻煩,或者你的「套接字數據處理程序」 - 邏輯阻塞主線程? – Spiek

+0

感謝評論,我剛剛編輯帖子。它需要阻塞直到得到響應。 – Vido

+0

有沒有你不使用信號和插槽機制的原因?你可以使用QTcpSocket的readyRead-Signal並將它連接到一個插槽。 – Spiek

回答

0

我對這個問題的方案最是通過複用select()您的插座。這樣你就不需要創建額外的線程,而且這是一個「非常POSIX」的方式。

見例如請參見本教程:

http://www.binarytides.com/multiple-socket-connections-fdset-select-linux/

或相關問題:

Using select(..) on client

+0

我想通過使用像QTcpSockets或boost套接字這樣的包裝來保持更高的抽象層,所以我不需要在原始內存上操作。 – Vido

+0

思考過後,我可以從這些wrappes中僅使用select()的那一刻獲取原始描述符,但我想使它平行。這仍然是很好的選擇,謝謝你。 – Vido

0

由於OMD_AT有媒體鏈接指出,最好的解決辦法是使用選擇()並讓內核做的工作適合你:-)

在這裏你有作爲的一個例子ync方法和Syncron多線程方法。

在這個例子中,我們創建10個連接到一個谷歌的互聯網服務和做一個簡單的GET請求到服務器,我們測量所需的每一種方法的所有連接多長時間才能收到來自谷歌服務器的響應。

請注意您應該使用更快的網絡服務器進行真正的測試,例如本地主機,因爲網絡延遲對結果有很大影響。

#include <QCoreApplication> 
#include <QTcpSocket> 
#include <QtConcurrent/QtConcurrentRun> 
#include <QElapsedTimer> 
#include <QAtomicInt> 

class Task : public QRunnable 
{ 
    public: 
     Task() : QRunnable() {} 
     static QAtomicInt counter; 
     static QElapsedTimer timer; 
     virtual void run() override 
     { 
      QTcpSocket* socket = new QTcpSocket(); 
      socket->connectToHost("www.google.com", 80); 
      socket->write("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n"); 
      socket->waitForReadyRead(); 
      if(!--counter) { 
       qDebug("Multiple Threads elapsed: %lld nanoseconds", timer.nsecsElapsed()); 
      } 
     } 
}; 

QAtomicInt Task::counter; 
QElapsedTimer Task::timer; 

int main(int argc, char *argv[]) 
{ 
    QCoreApplication app(argc, argv); 

    // init 
    int connections = 10; 
    Task::counter = connections; 
    QElapsedTimer timer; 

    /// Async via One Thread (Select) 

    // handle the data 
    auto dataHandler = [&timer,&connections](QByteArray data) { 
     Q_UNUSED(data); 
     if(!--connections) qDebug(" Single Threads elapsed: %lld nanoseconds", timer.nsecsElapsed()); 
    }; 

    // create 10 connection to google.com and send an http get request 
    timer.start(); 
    for(int i = 0; i < connections; i++) { 
     QTcpSocket* socket = new QTcpSocket(); 
     socket->connectToHost("www.google.com", 80); 
     socket->write("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n"); 
     QObject::connect(socket, &QTcpSocket::readyRead, [dataHandler,socket]() { 
      dataHandler(socket->readAll()); 
     }); 
    } 


    /// Async via Multiple Threads 

    Task::timer.start(); 
    for(int i = 0; i < connections; i++) { 
     QThreadPool::globalInstance()->start(new Task()); 
    } 

    return app.exec(); 
} 

打印:

Multiple Threads elapsed: 62324598 nanoseconds 
    Single Threads elapsed: 63613967 nanoseconds 
+0

正如我在評論中提到的那樣,我真的想要利用計算機中的許多內核,這些內核將專用於此軟件。 – Vido

+0

我已添加多線程處理支持 – Spiek

+0

在主線程中創建套接字是否安全並在其中對其執行一些操作?爲了提高效率,您還可以比較此解決方案和select()方法的性能? (是否有很大的區別) – Vido

0

雖然,答案已經被接受了,我想和大家分享我的)

我從你的問題明白了什麼:有256當前活動連接,您不時發送一個請求(命名爲「command」)給其中一個請求並等待響應。同時,要進行這個過程中多線程和,雖然你說:「這需要阻塞,直到它獲得響應」,我想你暗示阻止它處理請求 - 響應進程中的線程,但不是主線程。

如果我確實瞭解這個問題吧,這裏是我建議使用Qt來做到這一點:

#include <functional> 

#include <QObject>   // need to add "QT += core" in .pro 
#include <QTcpSocket>  // QT += network 
#include <QtConcurrent>  // QT += concurrent 
#include <QFuture>   
#include <QFutureWatcher> 

class CommandSender : public QObject 
{ 
public: 
    // Sends a command via connection and blocks 
    // until the response arrives or timeout occurs 
    // then passes the response to a handler 
    // when the handler is done - unblocks 
    void SendCommand(
     QTcpSocket* connection, 
     const Command& command, 
     void(*responseHandler)(Response&&)) 
    { 
     const int timeout = 1000; // milliseconds, set it to -1 if you want no timeouts 

     // Sending a command (blocking) 
     connection.write(command.ToByteArray()); // Look QByteArray for more details 
     if (connection.waitForBytesWritten(timeout) { 
      qDebug() << connection.errorString() << endl; 
      emit error(connection); 
      return; 
     } 

     // Waiting for a response (blocking) 
     QDataStream in{ connection, QIODevice::ReadOnly }; 
     QString message; 
     do { 
      if (!connection.waitForReadyRead(timeout)) { 
       qDebug() << connection.errorString() << endl; 
       emit error(connection); 
       return; 
      } 
      in.startTransaction(); 
      in >> message; 
     } while (!in.commitTransaction()); 

     responseHandler(Response{ message }); // Translate message to a response and handle it 
    } 

    // Non-blocking version of SendCommand 
    void SendCommandAsync(
     QTcpSocket* connection, 
     const Command& command, 
     void(*responseHandler) (Response&&)) 
    { 
     QFutureWatcher<void>* watcher = new QFutureWatcher<void>{ this }; 
     connect(watcher, &QFutureWatcher<void>::finished, [connection, watcher]() 
     { 
      emit done(connection); 
      watcher->deleteLater(); 
     }); 

     // Does not block, 
     // emits "done" when finished 
     QFuture<void> future 
      = QtConcurrent::run(this, &CommandSender::SendCommand, connection, command, responseHandler); 
     watcher->setFuture(future); 
    } 

signals: 
    void done(QTcpSocket* connection); 
    void error(QTcpSocket* connection); 
} 

現在你可以使用從線程池取出一個單獨的線程發送命令到插座:下引擎蓋QtConcurrent::run()使用由Qt爲您提供的QThreadPool的全局實例。該線程阻塞,直到它得到一個響應,然後用responseHandler來處理它。同時,管理所有命令和套接字的主線程將保持暢通。只需抓住done()信號即可知道響應已成功接收和處理。

有一點需要注意:異步版本只有在線程池中有空閒線程時才發送請求,否則就等待它。當然,這是任何線程池的行爲(這正是這種模式的要點),但不要忘記這一點。

另外我寫的沒有Qt的代碼很方便,所以可能會包含一些錯誤。

編輯:事實證明,這不是線程安全的,因爲套接字在Qt中不可重入。

你可以做的是將一個互斥量與一個套接字關聯,並在每次執行其功能時將其鎖定。這可以很容易地創建QTcpSocket類的包裝。請糾正我,如果我錯了。

+0

謝謝,它看起來不錯,但如果我們將指針傳遞給QtConcureent.run()中的套接字,不應該存在線程安全問題?我聽說套接字只能用於創建它們的線程,並且在不同線程之間傳遞它們是不安全的(我不知道應該創建哪些套接字,因爲主線程會不安全?)。如果我錯了,或者你有一些解釋說這是安全的,請糾正我。 – Vido

+0

你是對的,它不是線程安全的。原因是 - Qt中的套接字不可重入(我忘記了,很抱歉)。所以我遇到了這種情況,我嘗試管理套接字從主線程連接/斷開連接並讀取/寫入另一個工作線程。我的錯。 **但是**它導致了一個合乎邏輯的結論,即不管什麼(至少在Qt中),套接字上的所有**操作都必須在同一個線程中執行。 – WindyFields