我打電話火花提交經過MAXRATE,我有一個室壁運動接收器,以及1秒星火流+室壁運動:接收機MAXRATE違反
spark-submit --conf spark.streaming.receiver.maxRate=10 ....
批次但是一個批次可以大大超過的配合建立MAXRATE。即:我得到300條記錄。
我是否缺少任何設置?
我打電話火花提交經過MAXRATE,我有一個室壁運動接收器,以及1秒星火流+室壁運動:接收機MAXRATE違反
spark-submit --conf spark.streaming.receiver.maxRate=10 ....
批次但是一個批次可以大大超過的配合建立MAXRATE。即:我得到300條記錄。
我是否缺少任何設置?
這看起來像一個錯誤給我。從代碼中的角度來看,它看起來像Kinesis完全忽略了spark.streaming.receiver.maxRate
配置。
如果你看裏面0,你會看到:
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
此構造結束調用其對於配置了很多默認值的另一個構造:
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
您所關心的一個是DEFAULT_MAX_RECORDS
,它不斷設置爲10,000條記錄。您調用KinesisClientLibConfiguration
時調用withMaxRecords
的方法來設置實際的記錄數。這應該是一個簡單的修復。
但現在看來,Kinesis接收器似乎並不尊重該參數。
供將來參考。
這是一個已知bug固定在Spark 2.2.0
發佈
肯定,這是對問題的原因!謝謝 –