第一批: - 我想從100平面文件中拉出數據並加載到一個數組並將它們作爲字節數組逐個插入到kafka生產者中。Golang Kafka沒有消耗所有消息偏移西南
第二批: - 我正在消費卡夫卡消費者,然後將它們插入NoSQL數據庫。
我在Shopify sarama golang包的Kafka配置文件中使用Offsetnewset。
我可以接收和插入消息給卡夫卡,但消費時我只能得到第一條消息。因爲我在sarama配置中給了最新的Offset。 我怎樣才能得到這裏的所有數據。
第一批: - 我想從100平面文件中拉出數據並加載到一個數組並將它們作爲字節數組逐個插入到kafka生產者中。Golang Kafka沒有消耗所有消息偏移西南
第二批: - 我正在消費卡夫卡消費者,然後將它們插入NoSQL數據庫。
我在Shopify sarama golang包的Kafka配置文件中使用Offsetnewset。
我可以接收和插入消息給卡夫卡,但消費時我只能得到第一條消息。因爲我在sarama配置中給了最新的Offset。 我怎樣才能得到這裏的所有數據。
這是很難能夠告訴無需任何代碼如何卡夫卡配置一些或更深入的解釋(即:主題,分區,...),所以很少快速檢查來我的腦海:
假設你開始與OffsetNewest消耗設置你開始生產之前,有一兩件事,也許發生的事情是,你是不是從該主題的所有分區的消耗,對於以薩拉馬文檔,你必須明確地由消費每個分區創建PartitionConsumers。從例子中https://godoc.org/github.com/Shopify/sarama#Consumer:
partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
panic(err)
}
...
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d\n", msg.Offset)
consumed++
case <-signals:
break ConsumerLoop
}
}
你,其實,也開始消費後產生的所有事件,因此,指針讀他們一切都不OffsetNewest但OffsetOldest代替。
我很抱歉不能給你一個更加有用的答案,但也許如果你粘貼一些代碼或提供更多的細節,我們可以幫助更。