0
이 코드를 실행하려했으나 producer.send()가 KeyedMessage 유형을 허용하지 않기 때문에 작동하지 않습니다.producer.send는 KeyedMessage 유형을 허용하지 않습니다.
kafka.producer.Producer 대신 kafka.javaapi.producer.Producer를 가져 오려고했습니다. 하지만 여전히
작동하지 않는 코드는 다음과 같습니다
package sources;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;
//import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
//import kafka.producer.Producer;
public class ProducerCode {
private static Producer<Integer, String> producer;
private static final String topic= "mytopic";
public void initialize() {
Properties producerProps = new Properties();
producerProps.put("metadata.broker.list", "localhost:9092");
producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
producerProps.put("request.required.acks", "1");
// ProducerConfig producerConfig = new ProducerConfig(producerProps);
// have a change here **
producer = new Producer<Integer, String>(new ProducerConfig(producerProps));
} 당신은 생성자 (대신 KeyedMessage
의) ProducerRecord
를 사용할 필요가
public void publishMesssage() throws Exception{
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true){
System.out.print("Enter message to send to kafka broker (Press 'Y' to close producer): ");
String msg = null;
msg = reader.readLine(); // Read message from console
//Define topic name and message
KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String>(topic, msg);
producer.send(keyedMsg);
// producer.send(keyedMsg); // This publishes message on given topic
if("Y".equals(msg)){ break; }
System.out.println("--> Message [" + msg + "] sent.Check message on Consumer's program console");
}
return;
}
public static void main(String[] args) throws Exception {
KafkaProducer kafkaProducer = new KafkaProducer();
// Initialize producer
kafkaProducer.initialize();
// Publish message
kafkaProducer.publishMesssage();
//Close the producer
producer.close();
}
}
어떤 카프카 버전을 사용하고 있습니까? –
버전 10 (kafka-0.10.0.0)을 사용 중입니다. – Shaimaa