2016-11-29 62 views
2

我打電話火花提交經過MAXRATE,我有一個室壁運動接收器,以及1秒星火流+室壁運動:接收機MAXRATE違反

spark-submit --conf spark.streaming.receiver.maxRate=10 ....

批次但是一個批次可以大大超過的配合建立MAXRATE。即:我得到300條記錄。

我是否缺少任何設置?

回答

2

這看起來像一個錯誤給我。從代碼中的角度來看,它看起來像Kinesis完全忽略了spark.streaming.receiver.maxRate配置。

如果你看裏面0​​,你會看到:

val kinesisClientLibConfiguration = 
    new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId) 
    .withKinesisEndpoint(endpointUrl) 
    .withInitialPositionInStream(initialPositionInStream) 
    .withTaskBackoffTimeMillis(500) 
    .withRegionName(regionName) 

此構造結束調用其對於配置了很多默認值的另一個構造:

public KinesisClientLibConfiguration(String applicationName, 
     String streamName, 
     AWSCredentialsProvider kinesisCredentialsProvider, 
     AWSCredentialsProvider dynamoDBCredentialsProvider, 
     AWSCredentialsProvider cloudWatchCredentialsProvider, 
     String workerId) { 
    this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, 
      dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, 
      DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, 
      DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, 
      DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, 
      new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), 
      DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, 
      DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null); 
} 

您所關心的一個是DEFAULT_MAX_RECORDS,它不斷設置爲10,000條記錄。您調用KinesisClientLibConfiguration時調用withMaxRecords的方法來設置實際的記錄數。這應該是一個簡單的修復。

但現在看來,Kinesis接收器似乎並不尊重該參數。

+0

肯定,這是對問題的原因!謝謝 –

2

供將來參考。

這是一個已知bug固定在Spark 2.2.0發佈