내가 maxRate을 통과 스파크가 제출 부르고 위반,과 1스파크 스트리밍 + 운동성 : 수신기 MaxRate은 내가 하나의 운동성 수신기가,
spark-submit --conf spark.streaming.receiver.maxRate=10 ....
일괄 그러나 하나의 배치는 크게에 stablished 초과 할 수 maxRate. 즉 : 300 개의 레코드를 얻는 임.
설정이 누락 되었습니까?
내가 maxRate을 통과 스파크가 제출 부르고 위반,과 1스파크 스트리밍 + 운동성 : 수신기 MaxRate은 내가 하나의 운동성 수신기가,
spark-submit --conf spark.streaming.receiver.maxRate=10 ....
일괄 그러나 하나의 배치는 크게에 stablished 초과 할 수 maxRate. 즉 : 300 개의 레코드를 얻는 임.
설정이 누락 되었습니까?
이것은 나에게 버그처럼 보입니다. 코드에서 파고 들자 키네시스가 spark.streaming.receiver.maxRate
구성을 완전히 무시하고있는 것처럼 보입니다. 당신이 KinesisReceiver.onStart
내부를 보면
, 당신은 참조하십시오
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
관심있는 하나
는val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
이 생성자는 구성에 대한 기본값을 많이 가지고 다른 생성자를 호출 끝 DEFAULT_MAX_RECORDS
은 10,000 레코드로 계속 설정됩니다. KinesisClientLibConfiguration
에 withMaxRecords
이라는 메서드가 있는데 실제 레코드 수를 설정하기 위해 호출합니다. 이것은 쉬운 수정이어야합니다.
하지만 지금은 키네시스 수신기가 매개 변수를 준수하지 않는 것 같습니다.
나중에 참조 할 수 있도록.
이 알려진 bug는 Spark 2.2.0
출시
그 문제의 원인이었던에서 해결! 감사 –