2017-08-04 143 views
0

我正在嘗試學習和使用akka-stream-kafka並正在審閱其[文檔] [1]。在生產者設置部分,它告訴我們可以使用編程方式和從配置方式創建ProducerSettings。有一個程序化構建的例子,但沒有如何通過配置來創建它的例子。程序化結構很簡單,下面是一個例子。不過,我想使用配置基礎結構,並希望配置來自application.conf,因爲它會給我更多的控制權。我似乎無法在谷歌上找到它的一個例子。Haw在akka-stream-kafka中從配置(application.conf)創建ProducerSettings?

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
    .withBootstrapServers("localhost:9092") 

回答

1

該文檔只是向前您到Apache卡夫卡的Javadoc ProducerConfig,因爲是包含了一些你可以爲你的akka.kafka.producer.kafka-clients配置部分中鍵使用常量。

擴展從文檔的參考配置,一個例子是:

# Properties for akka.kafka.ProducerSettings can be 
# defined in this section or a configuration section with 
# the same layout. 
akka.kafka.producer { 
    # Tuning parameter of how many sends that can run in parallel. 
    parallelism = 100 

    # How long to wait for `KafkaProducer.close` 
    close-timeout = 60s 

    # Fully qualified config path which holds the dispatcher configuration 
    # to be used by the producer stages. Some blocking may occur. 
    # When this value is empty, the dispatcher configured for the stream 
    # will be used. 
    use-dispatcher = "akka.kafka.default-dispatcher" 

    # Properties defined by org.apache.kafka.clients.producer.ProducerConfig 
    # can be defined in this configuration section. 
    kafka-clients { 
    bootstrap.servers = "localhost:9092" 
    enable.auto.commit = true 
    auto.commit.interval.ms = 10000 
    acks = "all" 
    retries = 0 
    batch.size = 16384 
    } 
} 

application.conf文件的內容將被默認加載你的ActorSystem,所以每當你創建一個ProducerSettings對象按如下,它應該從akka.kafka.producer進行配置。您不需要將配置顯式傳遞給構造函數。

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
+0

感謝您的評論,我瞭解了這一部分,我想知道如何使用此配置文件來初始化/設置'ProducerSettings'屬性。 – Explorer

+0

我已經更新了答案。實質上,該配置應該由Akka自動加載,不需要做任何明確的.. –

+0

它的工作,我沒有在我的課程路徑資源目錄。謝謝 – Explorer

相關問題