2017-02-28 11 views
1

JMS 소스에서 데이터를 읽고 KAFKA 항목으로 푸시하려고하는데, 몇 시간 후 KAFKA 항목으로 푸시 빈도가 거의 0 그리고 초기 분석 후 나는 FLUME 로그에서 예외를 발견했다.org.apache.kafka.common.errors.RecordTooLargeException Flume 카프카 싱크에서

28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows. 
org.apache.flume.EventDeliveryException: Failed to publish events 
     at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252) 
     at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) 
     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 
     at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686) 
     at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449) 
     at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212) 
     ... 3 more 
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 

내 수로는 이러한 예외를 제거하지만 드릴 수 없습니다 수도 있습니다 max.request.size을 증가 명확 1,399,305에 비해 매우 작 1048576로 max.request.size에 대한 (로그) 현재 설정 값을 보여줍니다 해당 값을 업데이트하기위한 올바른 위치를 찾으십시오.

내 flume.config,

a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 

a1.channels.c1.type = file 
a1.channels.c1.transactionCapacity = 1000 
a1.channels.c1.capacity = 100000000 
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint 
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data 

a1.sources.r1.type = jms 

a1.sources.r1.interceptors.i1.type = timestamp 
a1.sources.r1.interceptors.i1.preserveExisting = true 

a1.sources.r1.channels = c1 
a1.sources.r1.initialContextFactory = some context urls 
a1.sources.r1.connectionFactory = some_queue 
a1.sources.r1.providerURL = some_url 
#a1.sources.r1.providerURL = some_url 
a1.sources.r1.destinationType = QUEUE 
a1.sources.r1.destinationName = some_queue_name 
a1.sources.r1.userName = some_user 
a1.sources.r1.passwordFile= passwd 

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 
a1.sinks.k1.kafka.topic = some_kafka_topic 
a1.sinks.k1.kafka.bootstrap.servers = some_URL 
a1.sinks.k1.kafka.producer.acks = 1 
a1.sinks.k1.flumeBatchSize = 1 
a1.sinks.k1.channel = c1 

어떤 도움은 정말 이해할 수있을 것이다!

답변

0

이 변경 사항은 카프카에서 수행해야합니다. 난 내 문제를 해결 한 것 같다

max.request.size=10000000 
+0

FLAE는 KAFKA의 제작자 lib를 사용하여 주제에 대한 메시지를 푸시합니다.하지만 FLUME을 구성 가능한 것으로 간주합니다. 하드 코딩 된 값을 생성자 클래스로 변경해야합니까? –

+0

@RiteshSharma 카프카가 서버에 설치되어 있지 않다는 말씀인가요? – franklinsijo

+0

사실이 "max.request.size"문제는 kafka sink를 사용하여 kafka 브로커의 데이터를 푸시하는 FLUME에서 발생합니다. 기본적으로 FLUME는 카프카 프로듀서 라이브러리 (카프카 싱크)를 사용하여 카프카 브로커의 데이터를 푸시합니다. FLUME은 전용 구성 파일을 "producer.properties"로 제공하지 않으므로 FLUME 구성에서만 카프카 제작자 등록 정보를 업데이트해야합니다. –

1

같은 더 큰 값으로 카프카 프로듀서 구성 파일 producer.properties 업데이트 ; max.request.size이 증가한 것으로 의심되는 경우 해당 카프카 싱크 (프로듀서) 속성을 업데이트하기 위해 예외가 제거되었습니다. FLUME은 상수 프리픽스를 "kafka.producer"로 제공합니다. 그리고이 상수 앞에 kafka 속성을 추가 할 수 있습니다.

내 광산은 다음과 같습니다. a1.sinks.k1.kafka.producer.max.request.size = 5271988.

+0

와우. 이것이 가능하다는 것을 결코 알지 못했습니다! – franklinsijo