我想以JSON格式發送消息給RabbitMQ並讓消息成功消耗。我試圖用駱駝來整合生產者和消費者。但是,我很難理解如何創建一條路線來實現這一點。我使用JSON模式來定義生產者和消費者之間的接口。我的應用程序創建JSON,將其轉換爲byte [],並使用Camel ProducerTemplate將消息發送到RabbitMQ。在消費者端,需要將byte []消息轉換爲String,然後轉換爲JSON,然後編組爲一個Object,以便我可以處理它。下面的代碼行不起作用然而駱駝處理來自RabbitMQ的JSON消息
from(startEndpoint).transform(body().convertToString()).marshal().json(JsonLibrary.Jackson, classOf[Payload]).bean(classOf[JsonBeanExample]),
這是因爲如果這個bean傳遞原始字節[]的內容,而不是由JSON json(JsonLibrary.Jackson, classOf[Payload])
創建的對象。我見過的所有使用json(..)調用的駱駝示例似乎後跟一個to(..),這是路由的結尾?以下是錯誤消息
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: uk.co.techneurons.messaging.Payload but has value: [[email protected] of type: byte[] on: Message: "{\"id\":1}". Caused by: No type converter available to convert from type: byte[] to the required type: uk.co.techneurons.messaging.Payload with value [[email protected] Exchange[ID-Tonys- iMac-local-54996-1446407983661-0-2][Message: "{\"id\":1}"]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: byte[] to the required type: uk.co.techneurons.messaging.Payload with value [[email protected]]`
我真的不希望使用Spring,註釋等,想服務激活儘可能簡單。使用駱駝儘可能
這是製片
package uk.co.techneurons.messaging
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
object RabbitMQProducer extends App {
val camelContext = new DefaultCamelContext
val rabbitMQEndpoint: String = "rabbitmq:localhost:5672/advert?autoAck=false&threadPoolSize=1&username=guest&password=guest&exchangeType=topic&autoDelete=false&declare=false"
val rabbitMQRouteBuilder = new RouteBuilder() {
override def configure(): Unit = {
from("direct:start").to(rabbitMQEndpoint)
}
}
camelContext.addRoutes(rabbitMQRouteBuilder)
camelContext.start
val producerTemplate = camelContext.createProducerTemplate
producerTemplate.setDefaultEndpointUri("direct:start")
producerTemplate.sendBodyAndHeader("{\"id\":1}","rabbitmq.ROUTING_KEY","advert.edited")
camelContext.stop
}
這是消費者..
package uk.co.techneurons.messaging
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.model.dataformat.JsonLibrary
object RabbitMQConsumer extends App {
val camelContext = new DefaultCamelContext
val startEndpoint = "rabbitmq:localhost:5672/advert?queue=es_index&exchangeType=topic&autoDelete=false&declare=false&autoAck=false"
val consumer = camelContext.createConsumerTemplate
val routeBuilder = new RouteBuilder() {
override def configure(): Unit = {
from(startEndpoint).transform(body().convertToString()).marshal().json(JsonLibrary.Jackson, classOf[Payload]).bean(classOf[JsonBeanExample])
}
}
camelContext.addRoutes(routeBuilder)
camelContext.start
Thread.sleep(1000)
camelContext.stop
}
case class Payload(id: Long)
class JsonBeanExample {
def process(payload: Payload): Unit = {
println(s"JSON ${payload}")
}
}
爲了完整,這是很容易複製的SBT文件..
name := """camel-scala"""
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= {
val scalaTestVersion = "2.2.4"
val camelVersion: String = "2.16.0"
val rabbitVersion: String = "3.5.6"
val slf4jVersion: String = "1.7.12"
val logbackVersion: String = "1.1.3"
Seq(
"org.scala-lang.modules" %% "scala-xml" % "1.0.3",
"org.apache.camel" % "camel-core" % camelVersion,
"org.apache.camel" % "camel-jackson" % camelVersion,
"org.apache.camel" % "camel-scala" % camelVersion,
"org.apache.camel" % "camel-rabbitmq" % camelVersion,
"com.rabbitmq" % "amqp-client" % rabbitVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"ch.qos.logback" % "logback-classic" % logbackVersion,
"org.apache.camel" % "camel-test" % camelVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test")
}
謝謝