0

EDIT2的恆定延遲:最後,我已經用Java做了我自己製作,而且運作良好,因此的問題是在卡夫卡的控制檯製片。卡夫卡控制檯消費者運作良好。卡夫卡+星火流:1秒

編輯:我已經嘗試過版本0.9.0.1,並具有相同的行爲。

我正在研究我的單身漢最終項目,Spark Streaming和Flink之間的比較。在兩個框架之前,我使用Kafka和一個腳本來生成數據(如下所述)。我的第一個測試是比較兩種框架與簡單工作負載之間的延遲,而Kafka給我一個非常高的延遲(持續1秒)。爲了簡單起見,目前我只用一臺機器運行Kafka和Spark。

我已經尋找並發現了類似的問題,並嘗試了他們給出的解決方案,但沒有任何改變。我還檢查了所有的卡夫卡配置的官方文檔中,並把importants在我的配置文件的等待時間,這是我的配置:

卡夫卡0.10.2.1 - 星火2.1.0

server.properties :

num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
num.partitions=2 
num.recovery.threads.per.data.dir=1 
log.flush.interval.messages=1000 
log.flush.interval.ms=50 
log.retention.hours=24 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=6000 
flush.messages=100 
flush.ms=10 

producer.properties:

compression.type=none 
max.block.ms=200 
linger.ms=50 
batch.size=0 

火花流程序:(其打印接收到的數據,並將所創建的數據,並且當正在爲函數處理時之間的差)

package com.tfg.spark1.spark1; 

import java.util.Map; 
import java.util.HashMap; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.*; 
import scala.Tuple2; 
import org.apache.spark.streaming.kafka.*; 

public final class Timestamp { 

    public static void main(String[] args) throws Exception { 
     if (args.length < 2) { 
      System.err.println("Usage: Timestamp <topics> <numThreads>"); 
      System.exit(1); 
     } 

     SparkConf conf = new SparkConf().setMaster("spark://192.168.0.155:7077").setAppName("Timestamp"); 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(100)); 


     Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
     int numThreads = Integer.parseInt(args[1]); 
     topicMap.put(args[0], numThreads); 

     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "192.168.0.155:2181", "grupo-spark", topicMap); //Map<"test", 2> 

     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      private static final long serialVersionUID = 1L; 

      public String call (Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 

     JavaDStream<String> newLine = lines.map(new Function<String, String>() { 
      private static final long serialVersionUID = 1L; 

      public String call(String line) { 
       String[] tuple = line.split(" "); 
       String totalTime = String.valueOf(System.currentTimeMillis() - Long.valueOf(tuple[1])); 
       //String newLine = line.concat(" " + String.valueOf(System.currentTimeMillis()) + " " + totalTime); 

       return totalTime; 
      } 
     }); 

     lines.print(); 
     newLine.print(); 

     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

生成的數據具有以下格式:

"Random bits" + " " + "current time in ms" 
01 1496421618634 
11 1496421619044 
00 1496421619451 
00 1496421618836 
10 1496421619247 

最後,當我運行火花流程序和腳本生成器,它在本例中生成每200ms的數據,火花(批次間隔= 100毫秒)打印9個空批次,和每一秒(總900毫秒的時刻,如:Tim E:1496421619 MS)這導致

------------------------------------------- 
Time: 1496421619900 ms 
------------------------------------------- 
01 1496421618634 
11 1496421619044 
00 1496421619451 
00 1496421618836 
10 1496421619247 
------------------------------------------- 
Time: 1496421619900 ms 
------------------------------------------- 
1416 
1006 
599 
1214 
803 

另外,如果我運行一個卡夫卡的命令行,製片人和另一個命令行的消費,它總是需要一些時間來打印生產數據消費者。

在此先感謝您的幫助!

+1

嘗試簡單的消費者首先看看它的火花具體或卡夫卡具體。幾乎沒有帖子(也來自linkedin)報告30毫秒的延遲。 –

+0

你的意思是卡夫卡控制檯消費者?我已經嘗試過,它也會延遲收到元素。我也從幾個網站上看到它可以實現這種延遲。我也會嘗試使用較舊的Kafka版本。謝謝! :D – Franmoti

+0

它也可能取決於您的硬件(例如線程數)。也嘗試看到系統處於穩定狀態(不只是一兩個消息)也許需要時間來「預熱」 –

回答

1

我剛剛更新了您打開的JIRA,以及您總是看到1000毫秒延遲的原因。

https://issues.apache.org/jira/browse/KAFKA-5426

我在這裏的原因報告...

的linger.ms參數使用如果不指定爲1000毫秒的命令行上--timeout選項設置。 同一批次。size參數在命令行上使用--max-partition-memory-bytes選項設置,如果未指定,則爲16384. 這意味着即使使用--producer-property或--producer.config指定了linger.ms和batch.size,它們也將是總是被上述「特定」選項所覆蓋。

相關問題