2015-12-30 70 views
1

我是卡夫卡的新手。如何通過Kafka連續讀取文件?

我已經嘗試了一些文件閱讀的例子,並適用於我的項目幾個星期。但是,我的應用程序似乎並不像我想的那樣工作,所以我要求您提供建議。

我的目的是:

  1. 卡夫卡生產從目錄中讀取A.文件
  2. 風暴消耗一個已經從1
  3. 移動產生遠曾經讀過文件到其他目錄數據。

條件:

  1. 文件連續發送到目錄A.

這是一個簡單的邏輯,但它讓我頭疼。

到目前爲止,我已經在我的本地計算機eclipse上創建並測試了卡夫卡生產者代碼。

我以爲是因爲kafka製作者應該繼續讀取文件,即使目錄A中的所有文件都被讀取,該進程也必須保持活動狀態。 但是,只要目錄A中的所有文件都被讀取併發送,它就會終止。

我使用3個代理在單個節點上運行Kafka,以下是生產者屬性設置。

Properties props = new Properties(); 

props.put("metadata.broker.list", "localhost:9092"); 
props.put("serializer.class", "kafka.serializer.StringEncoder"); 
props.put("producer.type", "sync"); 
props.put("request.required.acks", "1"); 

主題已使用以下命令創建。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test 

是我一直覺得文件中的卡夫卡的體系結構的角度讀錯了嗎? 還是有沒有一種方法,我還沒有找到? 如果有人能回答我的問題,我將不勝感激。

回答

0

您應該使用kafka.serializer.DefaultSerializer(二進制)。
你如何監視新文件的文件夾? 你可以使用類似apache.commons.io.monitor的東西。看看here

你卡在哪裏?你需要解決哪些問題(錯誤消息,真的有什麼)? 問,因爲它看起來像你想從某人完整的解決方案,這不是什麼所提供的。挖掘並提出具體的問題,當然,發佈代碼。

+0

感謝您的諮詢!這非常有幫助。稍後我會試着提出具體的問題。 – medicrush

0

我有類似的要求。我能夠從Java代碼創建主題..但是,消息沒有得到通過。沒有錯誤,主題目錄下的(00000000000000.log)文件保持空白。

Can someone please help me to figure out issue. ? 

Note: I'm able to create/pass messages to topics through consoles 


Java COde: 

import java.util.HashMap; 
import java.util.Map; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 

/** 
* Simple Producer 
*/ 
public class KafkaMessageProducer { 

     public static void main(String[] args) { 
     Map<String, Object> config = new HashMap<String, Object>(); 
     config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.50.85.90:6667"); 
     config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config); 
     producer.send(new ProducerRecord<String, String>("topic-1", "kdfgdfk", "vdfgdfgv")); 


    System.out.println("Hi Kafka Producer"); 
    producer.close(); 

    } 

} 
+0

如果在控制檯中一切正常,那麼你有沒有檢查kafka服務器配置?在我的情況下,未定義config/server.properties中的advertised.host.name。 – medicrush

+0

嗨,請你分享你的server.properties文件嗎?我只是想看看你配置了那個..謝謝。 – NiralKK

+0

實際上,我的server.properties文件沒有多少變化。簡單地取消了advertised.host.name的註釋並添加了我的主機IP。你可以在這裏看到相關的文章。 http://blog.yxbilu.com/blog/r/23.html – medicrush