2017-10-19 16 views
0

콘솔 Kafka 제작자에서 Hadoop 파일 시스템 (HDFS)으로 간단한 데이터 파이프 라인을 설정하려고합니다. 나는 64 비트 Ubuntu 가상 머신에서 일하고 있으며, 내가 따라온 가이드에 의해 제안 된 것처럼 Hadoop과 Kafka 모두에 대해 별도의 사용자를 만들었습니다. 콘솔 소비자와 함께 카프카에서 생산 된 자료를 소비하고 HDFS가 가동되고있는 것처럼 보입니다.Flume의 Kafka의 EOFException

이제 Flume을 사용하여 입력을 HDFS로 파이프 처리하려고합니다.

2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:467)] Completed connection to node 2147483647 
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.common.network.Selector.poll(Selector.java:307)] Connection with Ubuntu-Sandbox/127.0.1.1 disconnected 
java.io.EOFException 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) 
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) 
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:286) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:529) 
    at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83) 
    at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71) 
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 
: 나는

bin/flume-ng agent --conf ./conf -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n tier1 

내가 또 다시 콘솔 출력에서 ​​같은 예외가 다음 명령을 사용하여 수로를 실행할 때 이제

tier1.sources = source1 
tier1.channels = channel1 
tier1.sinks = sink1 

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181 
tier1.sources.source1.topic = test 
tier1.sources.source1.groupId = flume 
tier1.sources.source1.channels = channel1 
tier1.sources.source1.interceptors = i1 
tier1.sources.source1.interceptors.i1.type = timestamp 
tier1.sources.source1.kafka.consumer.timeout.ms = 2000 

tier1.channels.channel1.type = memory 
tier1.channels.channel1.capacity = 10000 
tier1.channels.channel1.transactionCapacity = 1000 

tier1.sinks.sink1.type = hdfs 
tier1.sinks.sink1.hdfs.path = hdfs://flume/kafka/%{topic}/%y-%m-%d 
tier1.sinks.sink1.hdfs.rollInterval = 5 
tier1.sinks.sink1.hdfs.rollSize = 0 
tier1.sinks.sink1.hdfs.rollCount = 0 
tier1.sinks.sink1.hdfs.fileType = DataStream 
tier1.sinks.sink1.channel = channel1 

: 나는 다음과 같은 구성 파일을 사용하고 있습니다

Flume을 중지하는 유일한 방법은 Java 프로세스를 중지하는 것입니다.

나는 그것이 하둡과 카프카를위한 별도의 사용자와 관련이있을 것이라고 생각했지만, 카프카 사용자와 함께 모든 것을 실행해도 동일한 결과를 얻습니다. 나는 EOFException 메서드에 관한 어떤 것도 발견하지 못했다. 이상하게도 "Getting Started"가이드를 따르고 모든 것을위한 꽤 표준적인 구성을 사용했다는 것을 고려하면 이상하다.

아마도 위의 줄 ("Ubuntu-Sandbox/127.0.1.1 disconnected")과 관련이있어서 내 VM의 구성과 관련이 있을까요?

도움이 되었으면 좋겠습니다.

답변

0

대신 Kafka Connect (Apache Kafka의 일부) 및 HDFS connector을 사용 해본 적이 있습니까? 이것은 일반적으로 Flume을 대체 한 것으로 보입니다. Flume과 비슷한 파일 기반 구성으로 사용하기 쉽습니다.

+0

조언 해 주셔서 감사합니다. Robin. Confluent에 익숙해졌으며 모든 것을 더 쉽게 만들어주는 것으로 보입니다. 그러나 다시 한번 나는 빠른 시작 가이드를 따라 가면서 Kafka에서 HDFS로 데이터를 쓸 수는 없다. 이번에도 나는 예외가 없으며 "connect-standalone"프로세스가 끝나지 않을 것이다. HDFS의 폴더 - 생성 되었음에도 불구하고 - 비어 있습니다 ... 이것은 정말 실망 스럽습니다! – stefanS