2016-11-29 2 views

답변

2

이것은 나에게 버그처럼 보입니다. 코드에서 파고 들자 키네시스가 spark.streaming.receiver.maxRate 구성을 완전히 무시하고있는 것처럼 보입니다. 당신이 KinesisReceiver.onStart 내부를 보면

, 당신은 참조하십시오

public KinesisClientLibConfiguration(String applicationName, 
     String streamName, 
     AWSCredentialsProvider kinesisCredentialsProvider, 
     AWSCredentialsProvider dynamoDBCredentialsProvider, 
     AWSCredentialsProvider cloudWatchCredentialsProvider, 
     String workerId) { 
    this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, 
      dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, 
      DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, 
      DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, 
      DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, 
      new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), 
      DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, 
      DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null); 
} 

관심있는 하나

val kinesisClientLibConfiguration = 
    new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId) 
    .withKinesisEndpoint(endpointUrl) 
    .withInitialPositionInStream(initialPositionInStream) 
    .withTaskBackoffTimeMillis(500) 
    .withRegionName(regionName) 

이 생성자는 구성에 대한 기본값을 많이 가지고 다른 생성자를 호출 끝 DEFAULT_MAX_RECORDS은 10,000 레코드로 계속 설정됩니다. KinesisClientLibConfigurationwithMaxRecords이라는 메서드가 있는데 실제 레코드 수를 설정하기 위해 호출합니다. 이것은 쉬운 수정이어야합니다.

하지만 지금은 키네시스 수신기가 매개 변수를 준수하지 않는 것 같습니다.

+0

그 문제의 원인이었던에서 해결! 감사 –

2

나중에 참조 할 수 있도록.

이 알려진 bugSpark 2.2.0 출시

확실히