2017-02-28 130 views
4

這是我的代碼片段。如何在spring引導應用程序中處理與spring-rabbit的JSON消息?

  • MQConfiguration類配置爲處理接收消息

    @RabbitListener(queues = "testMQ") 
    public class Receiver { 
    
        @RabbitHandler 
        public void receive(Message msg){ 
         System.out.println(msg.toString()); 
        } 
    } 
    
  • 這裏

    @Configuration 
    public class MQConfiguration { 
        @Bean 
        public Receiver receiver() { 
         return new Receiver(); 
        } 
    } 
    
  • Receiver類是JSON消息我發送到的RabbitMQ

    { 
        "id": 1, 
        "name": "My Name", 
        "description": "This is description about me" 
    } 
    

但是,當我運行我的應用程序時,我收到以下錯誤消息。

2017-02-28 17:16:35.931 WARN 11828 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. 

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60] 
Caused by: org.springframework.amqp.AmqpException: No method found for class [B 
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:127) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:224) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:61) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    ... 10 common frames omitted 

那麼我應該怎麼做,如果我只想要在方法中打印JSON消息呢?我真的很感謝任何人都可以對此發表看法。 :)

回答

0

如果你使用Spring啓動,你只需要配置:

@Bean 
public MessageConverter jsonMessageConverter() { 
    return new Jackson2JsonMessageConverter(); 
} 

否則,您必須配置:

@Bean 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
... 
    factory.setMessageConverter(new Jackson2JsonMessageConverter()); 
... 
    return factory; 
} 

http://docs.spring.io/spring-amqp/docs/1.7.0.RELEASE/reference/html/_reference.html#async-annotation-driven

+0

另請參閱[spring-rabbit-json示例](https://github.com/spring-projects/spring-amqp-samples)。 –

+0

@Artem Bilan是的,我使用Spring Boot,並且在配置類中添加'jsonMessageConverter()'註釋後錯誤仍然會提示,您可以從[here](https://github.com/kenshinji/)查看我的代碼。 rabbitmqtest)。 – kenshinji

0

爲了JSON發送到RabbitMQ的並通過Spring Boot使用它,我們需要設置content_type

讓我用一個例子,我有一個Python生產者和Java的消費者(我發送JSON從Python來的RabbitMQ和Spring的Java啓動了承接該任務JSON)描述。

解決辦法有兩個:

解決方案1:發送爲JSON字符串,並將其使用手動Jakson或GSON

轉換需要設置的CONTENT_TYPE = 「text/plain的」,並轉換成JSON到一個字符串。然後在Spring端,使用一個字符串作爲輸入的函數作爲監聽器並手動轉換該對象。

RabbitHandler:

@RabbitHandler 
public void receive(String inputString) throws IOException { 
    ObjectMapper objectMapper = new ObjectMapper(); 
    SimStatusReport theResult = objectMapper.readValue(inputString, SimStatusReport.class); 

    System.out.println("String instance " + theResult.toString() + 
      " [x] Received"); 
} 

SimStatusReport對象:

import lombok.AllArgsConstructor; 
import lombok.Data; 
import lombok.NoArgsConstructor; 

@Data 
@AllArgsConstructor 
@NoArgsConstructor 
public class SimStatusReport { 
    private String id; 
    private int t; 
} 

這裏是我的Python代碼:

import pika 
import json 
import uuid 


connectionResult = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channelResult = connectionResult.channel() 
routing_key_result = 'sim_results' 
channelResult.queue_declare(queue=routing_key_result, durable=True) 

def publish_result(sim_status): 
    message =json.dumps(sim_status) 
    channelResult.basic_publish(exchange='', 
           routing_key=routing_key_result, 
           body=message, 
           properties=pika.BasicProperties(
            content_type="text/plain", 
            content_encoding= 'UTF-8', 
            delivery_mode=2, # make message persistent 
         )) 
    print("Sent ", message) 


newsim_status = {'id': str(uuid.uuid4()), 't': 0} 
publish_result(newsim_status) 

解決方案2:發送JSON字符串,並讓Jackson2JsonMessageConverter做轉換爲你自動。

您需要設置的CONTENT_TYPE = 「應用/ JSON」。然後,您需要在RabbitMQ請求的標頭中將相應的標題添加到__TypeId__。您需要包含對象的確切名稱空間,以便傑克遜可以抵制轉換。

下面是使用Python(只是publish_result機能的研究),我的例子:

def publish_result(sim_status): 
    message =json.dumps(sim_status) 
    channelResult.basic_publish(exchange='', 
           routing_key=routing_key_result, 
           body=message, 
           properties=pika.BasicProperties(
            content_type="application/json" 
            headers={'__TypeId__': 'com.zarinbal.simtest.run.model.SimStatusReport'}, 
            content_encoding= 'UTF-8', 
            delivery_mode=2, # make message persistent 
         )) 
    print("Sent ", message) 

然後,你需要在Java配置爲使用Jackson2JsonMessageConverter:

@Configuration 
    public class RabbitConfiguration { 
     @Bean 
     public MessageConverter jsonMessageConverter() { 
      return new Jackson2JsonMessageConverter(); 
     } 
    } 

這裏將是你的聽衆:

@RabbitListener(queues = "sim_results") 
public class TaskReceiver { 
    @RabbitHandler 
    public void receive(SimStatusReport in) { 
     System.out.println("Object instance " + in + 
       " [x] Received"); 
    } 
} 

說明: 確保所有對象都具有所有屬性和所有參數構造函數的setter和getter。我從龍目島使用@Data,@NoArgsConstructor和@AllArgsConstructor自動生成它

相關問題