2016-07-25 4 views
0

이 코드를 실행하려했으나 producer.send()가 KeyedMessage 유형을 허용하지 않기 때문에 작동하지 않습니다.producer.send는 KeyedMessage 유형을 허용하지 않습니다.

kafka.producer.Producer 대신 kafka.javaapi.producer.Producer를 가져 오려고했습니다. 하지만 여전히

작동하지 않는 코드는 다음과 같습니다

package sources; 
import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.Properties; 

//import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
import kafka.javaapi.producer.Producer; 
//import kafka.producer.Producer; 


public class ProducerCode { 


private static Producer<Integer, String> producer; 
private static final String topic= "mytopic"; 

public void initialize() { 
    Properties producerProps = new Properties(); 
    producerProps.put("metadata.broker.list", "localhost:9092"); 
    producerProps.put("serializer.class", "kafka.serializer.StringEncoder"); 
    producerProps.put("request.required.acks", "1"); 
    // ProducerConfig producerConfig = new ProducerConfig(producerProps); 
    // have a change here ** 
    producer = new Producer<Integer, String>(new ProducerConfig(producerProps)); 

} 당신은 생성자 (대신 KeyedMessage의) ProducerRecord를 사용할 필요가

public void publishMesssage() throws Exception{    
    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));    
while (true){ 
    System.out.print("Enter message to send to kafka broker (Press 'Y' to close producer): "); 
    String msg = null; 
    msg = reader.readLine(); // Read message from console 
    //Define topic name and message 
    KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String>(topic, msg); 

    producer.send(keyedMsg); 
    // producer.send(keyedMsg); // This publishes message on given topic 

    if("Y".equals(msg)){ break; } 
    System.out.println("--> Message [" + msg + "] sent.Check message on Consumer's program console"); 
} 
return; 
} 




public static void main(String[] args) throws Exception { 

    KafkaProducer kafkaProducer = new KafkaProducer(); 
    // Initialize producer 
    kafkaProducer.initialize();    
    // Publish message 
    kafkaProducer.publishMesssage(); 
    //Close the producer 
    producer.close(); 

} 

} 
+0

어떤 카프카 버전을 사용하고 있습니까? –

+0

버전 10 (kafka-0.10.0.0)을 사용 중입니다. – Shaimaa

답변