3

스트리밍 중 하나에서 사용합니다. 우리의 센서 소프트웨어 중 하나는 스파크 스트리밍이 예외적으로 충돌하는 카프카에서 메시지를 읽으려고 할 때 ~ 20MB JSON 메시지/초를 50 개의 파티션이있는 카프카 항목으로 전송합니다. 상황을 더 잘 이해하기 위해 센서 소프트웨어가 1 초당 20MB의 메시지 만 전송하도록 설정했지만 동일한 오류로 인해 애플리케이션 충돌이 발생합니다. 이러한 시나리오를 처리하기 위해 수행해야 할 작업이 누락되면 알려 주시기 바랍니다.종료 오프셋 예외에 도달하기 전에 Spark Streaming이 메시지에서 Kafka Ran으로 충돌합니다.

우리는

val kafkaParams = Map[String, String](
     "security.protocol" -> "SASL_PLAINTEXT", 
     "group.id" -> groupid, 
     "metadata.broker.list" -> kafkaBrokerList, 
     "max.partition.fetch.bytes" -> "60000000") 

-Spark 원사에 다음과 같은 설정

-Kafka 0.9.0 server.properties

message.max.bytes=60000000 
replica.fetch.max.bytes=120000000 

-Spark 1.6.1 구성 DirectAPI를 제출 한

spark-submit \ 
--verbose \ 
--master yarn-cluster \ 
--num-executors 3 \ 
--executor-memory 7g \ 
--executor-cores 3 \ 
--conf spark.driver.memory=1024m \ 
--conf spark.streaming.backpressure.enabled=false \ 
--conf spark.streaming.kafka.maxRatePerPartition=3 \ 
--conf spark.streaming.concurrentJobs=3 \ 
--conf spark.speculation=true \ 
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ 
--files kafka_jaas.conf#kafka_jaas.conf,user.headless.keytab#user.headless.keytab \ 
--driver-java-options "-Djava.security.auth.login.config=./kafka_jaas.conf -Dhttp.proxyHost=PROXY_IP -Dhttp.proxyPort=8080 -Dhttps.proxyHost=PROXY_IP -Dhttps.proxyPort=8080 -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-driver.properties" \ 
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_jaas.conf -Dlog4j.configuration=file:/home/user/spark-log4j/log4j-topic_name-executor.properties" \ 
--class com.spark.demo.StreamProcessor /home/user/demo.jar /tmp/data/out 60 KAFKA_BROKER:6667 "groupid" topic_name 

- 예외 :

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, IP_HOST): java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 197 for topic x_topic_3 partition 24 start 196. This should not happen, and indicates that messages may have been lost 
at scala.Predef$.assert(Predef.scala:179) 
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211) 
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1335) 
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1335) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Driver stacktrace 

:

답변

0

이 kafkaParams에 ("fetch.message.max.bytes" -> "20971520")을 추가, 당신은 ConsumerConfig.scala#114 (스파크 1.6.2) 나는 단순한 소비자`최대를 들어, 단순한 소비자를 사용 DirectAPI을 사용하고

+0

에서 소스 코드를 찾을 수 있습니다 .partition.fetch.bytes'는 내가 맞으면 max message byte를 바로 가져올 수있는 설정입니다. – nilesh1212

+0

KafkaRDD.scala # fetecBatch (190 행)에서'.addFetch (part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)'를 찾을 수있는'kc.config.fetchMessageMaxBytes' * 키 *는 ** fetch.message.max.bytes **입니다. – klion26

+0

그럼 DirectAPI에 대한 'max.partition.fetch.bytes'와'fetch.message.max.byte '의 차이는 무엇입니까 – nilesh1212