2013-07-23 1 views
3

카프카 서버에 메시지를 보내려고합니다. 브로커 목록에 localhost가 포함되어 있지 않습니다. 로컬 호스트에 프로듀서 연결 : 생산은 send 메소드를 호출 할 때, 그것의 예외가 9092 실패를카프카 자바 프로듀서를 사용하여 메시지 보내기 : 로컬 호스트에 프로듀서 연결 : 9092 실패

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

import java.util.Properties; 

// create a producer 

Properties props = new Properties(); 

props.put("metadata.broker.list", "192.168.1.203:9092"); 
props.put("serializer.class", "kafka.serializer.StringEncoder"); 
props.put("request.required.acks", "1"); 

ProducerConfig config = new ProducerConfig(props); 

producer = new Producer<String, String>(config); 

//sending... 
String topic = "data_collect_events"; 
String message = "_Message_1"; 
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, message); 
producer.send(keyedMessage); 

예외 :

ERROR [main] 2013-07-23 19:27:10,580 kafka.utils.Logging$class: Producer connection to localhost:9092 unsuccessful 
java.net.ConnectException: Connection refused: connect 
    at sun.nio.ch.Net.connect0(Native Method) 
    at sun.nio.ch.Net.connect(Net.java:364) 
    at sun.nio.ch.Net.connect(Net.java:356) 
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:623) 
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) 
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) 
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) 
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:100) 
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244) 
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107) 
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101) 
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:631) 
    at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) 
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) 
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) 
    at enter code herekafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101) 
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) 
    at kafka.producer.Producer.send(Producer.scala:74) 
    at kafka.javaapi.producer.Producer.send(Producer.scala:32) 

답변

4

당신은 카프카 0.8에 대한 설정 키 카프카 0.7을 사용하는 대부분의 경우, 그래서

props.put("metadata.broker.list", "192.168.1.203:9092"); 

단순히 로컬 호스트를 무시하고 생산자 기본값입니다 : 9092에는 브로커가 지정되지 않은 경우.
이 아닌 You have to usebroker.list입니다.

4

카프카 0.8에서 중개인 목록은 메타 데이터 검색에만 사용됩니다. 그러면 생성자는 반환 된 메타 데이터 정보를 사용하여 브로커에 연결합니다. 메타 데이터의 호스트 이름은 호스트의 OS 설정에 따라 다릅니다 ( https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F 참조).

server.properties에서 host.name을 설정할 수 있습니다. 이

host.name=192.168.1.203 
0

처럼 나는 사육사에 의해 예상대로 EC2 외부에있는 카프카 생산에 내부 EC2 IP 해결할 수 없습니다 비슷한 문제가 있었다. 생산자는 카프카 브로커에게 이야기 할 public-ip & internal-ec2-ip :

나는의 항목을 추가 /etc/hosts 파일을 편집했다.