1
나는 kafka 및 spark를 사용하여 메시지를 사용하려고하는 코드를 작성하고 있습니다. 하지만 내 코드가 작동하지 않습니다. logger (org.apache.kafka.clients.producer.ProducerConfig)에 대한 appender가 없습니다.
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util._
object Smack_Kafka_Spark extends App {
def main(args: Array[String]) {
val kafkaBrokers = "localhost:2181"
val kafkaOpTopic = "test"
/*val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")*/
val props = new Properties()
props.put("bootstrap.servers", "localhost:2181")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
var spark: SparkSession = null
val textFile: RDD[String] = spark.sparkContext.textFile("dataset.txt")
textFile.foreach(record => {
val data = record.toString
val message = new ProducerRecord[String, String](kafkaOpTopic, null, data)
producer.send(message)
})
producer.close()
}
}
이것은 내가 가진 오류는 다음과 같습니다 : 여기 내 코드입니다
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NullPointerException
at Smack_Kafka_Spark$.main(Smack_Kafka_Spark.scala:25)
at Smack_Kafka_Spark.main(Smack_Kafka_Spark.scala)
내가 어떤 도움을 매우 감사하게 될 것입니다!
감사합니다. 당신이 말했듯이, 연재 문제가 나타났습니다. 그리고 제가 언급 한 링크를 따라갔습니다. – edkeveked
이 정정 코드는이 코드는 보정 브로 TEXTFILE : RDD [문자열] = spark.sparkContext.textFile ("Dataset.txt") textFile.foreachPartition ((partisions : 반복자 [문자열) => { producer.send (new ProducerRecord [String, String]) { } 제작자 : KafkaProducer [문자열, 문자열] = 새 KafkaProducer [문자열, 문자열] (소품) partisions.foreach (줄 : String) => { { ("테스트", 선)) } 잡기 { 경우 예 : 예외 => { } } })}) – edkeveked
당신은 내가 추가하는 방법을 말씀 해주십시오 수있는 소비 이 코드에서 프로듀서가 보낸 내용을 검색하려면? – edkeveked