2017-05-05 83 views
0

有人可以向我解釋如何通過Logstash觸發Celery任務嗎? 這可能嗎?Logstash - 如何通過RabbitMQ觸發Celery任務

如果我試圖通過 'PHP-amqplib' 庫做,在PHP中正常工作:(不使用Logstash)

$connection = new AMQPStreamConnection(
    'rabbitmq.local', 
    5672, 
    'guest', 
    'guest' 
); 
$channel = $connection->channel(); 
$channel->queue_declare(
    'celery', 
    false, 
    true, 
    false, 
    false 
); 
$taskId = rand(1000, 10000); 
$props = array(
    'content_type' => 'application/json', 
    'content_encoding' => 'utf-8', 
); 

$body = array(
    'task'  => 'process_next_task', 
    'lang'  => 'py', 
    'args'  => array('ktest' => 'vtest'), 
    'kwargs' => array('ktest' => 'vtest'), 
    'origin' => '@'.'mytest', 
    'id'  => $taskId, 
); 

$msg = new AMQPMessage(json_encode($body), $props); 
$channel->basic_publish($msg, 'celery', 'celery'); 

按照芹菜文檔:

http://docs.celeryproject.org/en/latest/internals/protocol.html

我試圖以json格式發送請求,這是我的Logstash過濾器:

ruby 
{ 
    remove_field => ['headers', '@timestamp', '@version', 'host', 'type'] 
    code => " 
     event.set('properties', 
     { 
      :content_type => 'application/json', 
      :content_encoding => 'utf-8' 
     }) 
    " 
} 

和芹菜的回答是:

[2017-05-05 14:35:09,090: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?! 
{content_type:None content_encoding:None delivery_info:{'exchange': 'celery', 'routing_key': 'celery', 'redelivered': False, 'consumer_tag': 'None4', 'delivery_tag': 66} headers={}} 

基本上,芹菜不能我的消息格式或解碼更好......我不能設置JSON格式:)

它的請求快把我逼瘋了,謝謝您的任何線索:)

忘記它,這是我的Logstash輸出插件

rabbitmq 
      { 
       key    => "celery" 
       exchange  => "celery" 
       exchange_type => "direct" 
       user   => "${RABBITMQ_USER}" 
       password  => "${RABBITMQ_PASSWORD}" 
       host   => "${RABBITMQ_HOST}" 
       port   => "${RABBITMQ_PORT}" 
       durable   => true 
       persistent  => true 
       codec   => json 

      } 

回答

1

從信息provid在this question編輯,你不能。

當您在使用ruby過濾器處理事件時,實際上是在處理消息正文中的內容,而您想設置消息的rabbitmq頭和屬性。

直到該功能已被tackled,我認爲你不能實現它,除非你自己實現它。畢竟,插件在github上可用。