1

Kafka를 사용하여 Spark Streaming 작업을 실행하려고합니다.Kafka + Spark 스트리밍 : ClosedChannelException

Kafka를 통해 csv 로그 파일을 보내어 Spark Streaming 응용 프로그램에 메시지를 게시합니다.

이를 달성하기 위해 Spark Streaming 응용 프로그램에서 직접 접근 방식을 사용합니다.

처음에는 로그 파일의 데이터가 좋았지 만 잠시 후 스칼라 IDE에 다음 오류 메시지가 표시됩니다.

환경 : Spark를 모든 코어에서 로컬로 실행하고 있습니다. 사육사, 카프카도 내 시스템에서 로컬로 운영되고 있습니다.

ERROR :

16/09/05 17:53:28 ERROR Executor: Exception in task 0.0 in stage 390.0 (TID 390) 
java.nio.channels.ClosedChannelException 
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) 
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 

무엇 발생하면이 예외 중순 방법을 발생합니다 때로는 로그 데이터의 무리가 콘솔에이 오류 메시지가 다음입니다.

네트워크 오류처럼 보입니다. "Closed Channel Exception"이지만 로컬에서 모든 프로세스를 실행하고 있기 때문에 근본적인 원인이 있는지 궁금합니다.

이 문제를 해결하기위한 몇 가지 지침을 얻을 수 있다면 좋을 것입니다.

+0

아래 도움말이 도움이 되었습니까? –

+0

해결 방법을 찾으셨습니까? – dirceusemighini

+0

@dirceusemighini 게시물의 답변보기 http://stackoverflow.com/questions/35807844/kafka-consumer-simpleconsumer-reconnect-due-to-socket-error-java-nio-channels/43105342#43105342, 도움이 될 수 있습니다. – Nietzsche

답변

0

kafka producer.properties config (예 : metadata.broker.list)의 localhost를 시스템 IP로 바꿉니다. 또한 등 /에/호스트가 대체 파일 :

127.0.0.1의 로컬 호스트 localhost.localdomain에게

와 x.x.x.x 로컬 호스트 localhost.localdomain

x.x.x.x는 컴퓨터의 IP입니다

. 도움이되는지 확인하십시오.

+0

안녕하세요, 의견을 보내 주셔서 감사합니다. 나는 당신이 말한 것을 시도했지만 불행히도 여전히 같은 오류를주고 있습니다. My Kafka가 Windows 10에 설치되어 있으므로 내 컴퓨터에서/etc/hosts를 찾을 수 없습니다. 그러나 표시된대로 producer.properties 파일을 변경했습니다. metadata.broker.list = xx.xx.xx.xx : xxxx – Amna