2015-11-01 77 views
0

我想以JSON格式發送消息給RabbitMQ並讓消息成功消耗。我試圖用駱駝來整合生產者和消費者。但是,我很難理解如何創建一條路線來實現這一點。我使用JSON模式來定義生產者和消費者之間的接口。我的應用程序創建JSON,將其轉換爲byte [],並使用Camel ProducerTemplate將消息發送到RabbitMQ。在消費者端,需要將byte []消息轉換爲String,然後轉換爲JSON,然後編組爲一個Object,以便我可以處理它。下面的代碼行不起作用然而駱駝處理來自RabbitMQ的JSON消息

from(startEndpoint).transform(body().convertToString()).marshal().json(JsonLibrary.Jackson, classOf[Payload]).bean(classOf[JsonBeanExample]), 

這是因爲如果這個bean傳遞原始字節[]的內容,而不是由JSON json(JsonLibrary.Jackson, classOf[Payload])創建的對象。我見過的所有使用json(..)調用的駱駝示例似乎後跟一個to(..),這是路由的結尾?以下是錯誤消息

Caused by: org.apache.camel.InvalidPayloadException: No body available of type: uk.co.techneurons.messaging.Payload but has value: [[email protected] of type: byte[] on: Message: "{\"id\":1}". Caused by: No type converter available to convert from type: byte[] to the required type:  uk.co.techneurons.messaging.Payload with value [[email protected] Exchange[ID-Tonys- iMac-local-54996-1446407983661-0-2][Message: "{\"id\":1}"]. Caused by:  [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: byte[] to the required type: uk.co.techneurons.messaging.Payload with value [[email protected]]`  

我真的不希望使用Spring,註釋等,想服務激活儘可能簡單。使用駱駝儘可能

這是製片

package uk.co.techneurons.messaging 
import org.apache.camel.builder.RouteBuilder 
import org.apache.camel.impl.DefaultCamelContext 

object RabbitMQProducer extends App { 
    val camelContext = new DefaultCamelContext 
    val rabbitMQEndpoint: String = "rabbitmq:localhost:5672/advert?autoAck=false&threadPoolSize=1&username=guest&password=guest&exchangeType=topic&autoDelete=false&declare=false" 
    val rabbitMQRouteBuilder = new RouteBuilder() { 
    override def configure(): Unit = { 
     from("direct:start").to(rabbitMQEndpoint) 
    } 
    } 
    camelContext.addRoutes(rabbitMQRouteBuilder) 
    camelContext.start 
    val producerTemplate = camelContext.createProducerTemplate 
    producerTemplate.setDefaultEndpointUri("direct:start") 
    producerTemplate.sendBodyAndHeader("{\"id\":1}","rabbitmq.ROUTING_KEY","advert.edited") 
    camelContext.stop 
} 

這是消費者..

package uk.co.techneurons.messaging 
import org.apache.camel.builder.RouteBuilder 
import org.apache.camel.impl.DefaultCamelContext 
import org.apache.camel.model.dataformat.JsonLibrary 

object RabbitMQConsumer extends App { 
    val camelContext = new DefaultCamelContext 
    val startEndpoint = "rabbitmq:localhost:5672/advert?queue=es_index&exchangeType=topic&autoDelete=false&declare=false&autoAck=false" 
    val consumer = camelContext.createConsumerTemplate 
    val routeBuilder = new RouteBuilder() { 
    override def configure(): Unit = { 
     from(startEndpoint).transform(body().convertToString()).marshal().json(JsonLibrary.Jackson, classOf[Payload]).bean(classOf[JsonBeanExample]) 
    } 
} 
camelContext.addRoutes(routeBuilder) 
camelContext.start 
Thread.sleep(1000) 
camelContext.stop 
} 

case class Payload(id: Long) 

class JsonBeanExample { 
    def process(payload: Payload): Unit = { 
    println(s"JSON ${payload}") 
    } 
} 

爲了完整,這是很容易複製的SBT文件..

name := """camel-scala""" 

version := "1.0" 

scalaVersion := "2.11.7" 

libraryDependencies ++= { 
val scalaTestVersion = "2.2.4" 
val camelVersion: String = "2.16.0" 
val rabbitVersion: String = "3.5.6" 
val slf4jVersion: String = "1.7.12" 
val logbackVersion: String = "1.1.3" 
Seq(
    "org.scala-lang.modules" %% "scala-xml" % "1.0.3", 
    "org.apache.camel" % "camel-core" % camelVersion, 
    "org.apache.camel" % "camel-jackson" % camelVersion, 
    "org.apache.camel" % "camel-scala" % camelVersion, 
    "org.apache.camel" % "camel-rabbitmq" % camelVersion, 
    "com.rabbitmq" % "amqp-client" % rabbitVersion, 
    "org.slf4j" % "slf4j-api" % slf4jVersion, 
    "ch.qos.logback" % "logback-classic" % logbackVersion, 
    "org.apache.camel" % "camel-test" % camelVersion % "test", 
    "org.scalatest" %% "scalatest" % scalaTestVersion % "test") 
} 

謝謝

回答

1

我決定我需要創建一個Bean並註冊它(說起來容易做起來難! - 對於一些未知的原因JNDIRegistry沒有與DefaultCamelContext工作 - 所以我用了SimpleRegistry),

val registry: SimpleRegistry = new SimpleRegistry() 
    registry.put("myBean", new JsonBeanExample()) 
    val camelContext = new DefaultCamelContext(registry) 

然後,我改變了消費routeBuilder - 好像我已經過改造的消息。

from(startEndpoint).unmarshal.json(JsonLibrary.Jackson, classOf[Payload]).to("bean:myBean?method=process") 

我也改變了豆所以setter方法可用,並添加一個toString

class Payload { 
    @BeanProperty var id: Long = _ 
    override def toString = s"Payload($id)" 
} 
class JsonBeanExample() { 
    def process(payload: Payload): Unit = { 
    println(s"recieved ${payload}") 
    } 
} 

現在接下來的問題是讓死信隊列工作,並確保在Bean處理程序故障使他們的方式正確備份堆棧