2017-01-03 7 views
1

kafka 커넥터 3.0.1 버전을 사용하고 있습니다. new-group이라는 새 그룹을 만들고 약 20 개의 주제가 있습니다. 내가 커넥터 프레임 워크를 시작할 때 시스템이 모든 주제에 대해 약 2 분의 재조정을 수행하여 재조정을 중지 할 수 없다는 점이 유감입니다. 나는 그 이유를 모른다. 오류 메시지의 일부는 다음과 같습니다Confluent Kafka 커넥터 - 재조정을 중지 할 수 없습니다.

[2017-01-03 21:43:57,718] ERROR Commit of WorkerSinkTask{id=new-connector-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180) 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) 
     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
: 

는 연속 재조정과 아무 상관이 있는지 모르겠다.

KafkaConsumer.poll()이 구성된 시간 초과보다 길면 kafka가 파티션을 취소하고 재조정이 시작되지만 각 시간마다의 폴링이 그렇게 길지 않다는 것을 알고 있습니다. . 누구나 내게 단서를 줄 수 있습니까?

답변

1

생각해 보면 max.poll.records이 해결할 수 있습니다. 모든 루프 반복에서 처리해야하는 레코드의 수를 조정하는 것입니다. 0.10에는 max.poll.records이 있으며, 이는 각 호출에서 반환 된 레코드 수의 상한을 설정합니다.

또한 confluent에 따라 consumer.poll()은 30-60 초와 같이 상당히 긴 세션 시간 제한을 가져야합니다.

또한 미세 조정 할 수 있습니다 :

session.timeout.ms 
heartbeat.interval.ms 
max.partition.fetch.bytes 
+0

Yes (예)로 변경되었습니다 투표 결과를 hdfs에 넣을 시간, 재조정이 올 것입니다. 나는 코드를 최적화했으며 재조정은 거의 없습니다. – wuchang

0

는 소비자가 더 나은 폴링 호출 사이에 긴 기간을 처리하기 위해이 릴리스에서 향상 되었기 때문에 0.10.1 이상으로 업그레이드 고려().

HDFS에 결과를 저장하는 데 5 분 이상 걸리는 경우 새 max.poll.interval.ms 매개 변수를 늘릴 수 있습니다. 이렇게하면 소비자가 진전을 보이지 않아 소비자 그룹에서 쫓겨나지 않게됩니다. 0.10.1 릴리스에서는

은 새로운 자바 소비자는 이제 배경 스레드에서 하트 비트 지원

을 말한다 지적한다. 새 구성 max.poll.interval.ms가 있습니다. 은 소비자 이 사전에 그룹을 떠날 때 (기본적으로 5 분) 폴링 호출 사이의 최대 시간을 제어합니다. 값 request.timeout.ms은 max.poll.interval.ms보다 커야합니다. 소비자가 재조정하는 동안 JoinGroup 요청이 서버에서 차단 될 수있는 최대 시간이기 때문에 우리는 기본값을 5 분 이상으로 변경했습니다. 마지막으로, session.timeout.ms의 기본 값은 10 초까지 조정 된, 내가 너무 많이 걸릴 때 max.poll.records의 기본 값은 (500)