2017-07-19 137 views
0

我正在嘗試將我的一些代碼從XML遷移到java dsl樣式(pre-java8)中。彈簧集成java dsl - java 7 - 在出站通道適配器中設置輪詢器

這是我創建的java配置,我無法弄清楚如何設置輪詢器。這些例子只討論全局輪詢器,但我需要在適配器中設置輪詢器。

@Bean 
    public MessageHandler kafkaMessageHandler() { 
    KafkaProducerMessageHandler<String, String> handler = 
     new KafkaProducerMessageHandler<>(kafkaTemplate()); 
    handler.setMessageKeyExpression(new LiteralExpression("kafka-integration")); 
    handler.setTopicExpression(new LiteralExpression("headers.kafka_topic")); 
    return handler; 
    } 

    @Bean 
    public KafkaTemplate<String, String> kafkaTemplate() { 
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(producerConfigs())); 
    } 


    @Bean 
    public Map<String, Object> producerConfigs() { 
    Map<String, Object> properties = new HashMap<>(); 
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); 
    // introduce a delay on the send to allow more messages to accumulate 
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
    return properties; 
    } 

的XML相當於我有如下:

<int-kafka:outbound-channel-adapter 
    id="kafkaOutboundChannelAdapter" 
    kafka-producer-context-ref="kafkaProducerContext" 
    channel="kafkaChannel" > 
    <int:poller fixed-rate="1000" max-messages-per-poll="10000}"/> 
</int-kafka:outbound-channel-adapter> 

回答

1

the documentation ......

@Bean 
@ServiceActivator(inputChannel = "kafkaChannel" poller = @Poller(fixedDelay = "1000", ...) 
public MessageHandler kafkaMessageHandler() { 
    ... 
} 
+0

感謝,正是我一直在尋找。 – Vidhya

相關問題