2013-03-04 1 views
4

며칠 동안 Apache Kafka와 함께 놀았습니다. 내 문제는 입니다. "빠른 시작"섹션에서 설명하는 로컬 테스트를 설정하면 웹 사이트에서 모든 것이 잘되고, 카프카 제작자/소비자, 사육사 서버 및 카프카 브로커가 완벽하게 작동합니다. 지금은 원격 서버에서 실행하는 경우Kafka 제작자의 원격 사육사에게 연결할 수 없습니다

(의는 노드 2를 부르 자) : - 사육사 - 포트 2181 - 카프카 브로커 - 포트 9092 - 카프카 소비자를

그리고 내 로컬 컴퓨터에서 실행하는 경우 : - 카프카 제작자

node2에 방화벽이 없다고 가정합니다. 연결이 만료되어 종료됩니다. 여기

오류 로그입니다 :

/etc/java/jdk1.6.0_41/bin/java -Didea.launcher.port=7533 -Didea.launcher.bin.path=/home/kevin/Documents/idea-IU-123.169/bin -Dfile.encoding=UTF-8 -classpath /etc/java/jdk1.6.0_41/lib/dt.jar:/etc/java/jdk1.6.0_41/lib/tools.jar:/etc/java/jdk1.6.0_41/lib/jconsole.jar:/etc/java/jdk1.6.0_41/lib/htmlconverter.jar:/etc/java/jdk1.6.0_41/lib/sa-jdi.jar:/home/kevin/Desktop/kafka-0.7.2/examples/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-compiler.jar:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-library.jar:/home/kevin/Desktop/kafka-0.7.2/core/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Documents/idea-IU-123.169/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain kafka.examples.KafkaConsumerProducerDemo 
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). 
log4j:WARN Please initialize the log4j system properly. 
Exception in thread "Thread-0" java.net.ConnectException: Connection timed out 
    at sun.nio.ch.Net.connect(Native Method) 
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532) 
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:173) 
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:92) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:125) 
    at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114) 
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100) 
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) 
    at kafka.producer.ProducerPool.send(ProducerPool.scala:100) 
    at kafka.producer.Producer.zkSend(Producer.scala:137) 
    at kafka.producer.Producer.send(Producer.scala:99) 
    at kafka.javaapi.producer.Producer.send(Producer.scala:103) 
    at kafka.examples.Producer.run(Producer.java:53) 

Process finished with exit code 0 

그리고 여기 내 프로듀서의 코드입니다 :

import java.util.Properties; 
import kafka.javaapi.producer.ProducerData; 
import kafka.producer.ProducerConfig; 


public class Producer extends Thread{ 

    private final kafka.javaapi.producer.Producer<String, String> producer; 
    private final String topic; 
    private final Properties props = new Properties(); 

    public Producer(String topic) 
    { 
    props.put("zk.connect", "node2:2181"); 
    props.put("connect.timeout.ms", "5000"); 
    props.put("socket.timeout.ms", "30000"); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    props.put("producer.type", "sync"); 
    props.put("conpression.codec", "0"); 
    producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props)); 
    this.topic = topic; 
    } 

    public void run() { 
     String messageStr = new String("Message_test"); 
     producer.send(new ProducerData<String, String>(topic, messageStr)); 
    } 
} 

** 그래서 나는 또한

에 의해

props.put("zk.connect", "node2:2181"); 

를 전환 시험

props.put("broker.list", "0:node2:9082"); 

그리고이 경우에 나는 성공적으로 연결할 수 있습니다. ** 것은

+0

아직 문제를 해결할 수 있습니까 ?? –

답변

3

페이지의 해결 방법은 명시 적으로 카프카

의 server.properties에서 호스트 이름 속성을 설정하는 것입니다 3

http://kafka.apache.org/faq.html에서 항목 번호는

이 작업을 확인할 수 있습니다 사육사를 사용하여. kafka 0.7 *을 사용하는 경우 ZkCli 콘솔을 열고/brokers/ids/0을 가져 오면 모든 브로커 메타 데이터를 가져와야합니다. Zk와 제조자 코드에서 사용하는 연결 문자열 일치 여기에 확인 IP 주소/호스트 이름을 확인 - 내 경우

props.put("zk.connect", "node2:2181"); 

를, I는 우분투 VM에 연결 내 로컬 컴퓨터에서 실행중인 프로듀서 (동일한 상자를 사용했다 다른 IP)이 해결 방법이 도움이되었습니다.