0
제작자가 보낸 데이터가 소비자에게 전달되지 않는 이유를 알 수 없습니다. cloudera 가상 시스템을 연구 중입니다. 저는 생산자가 Kafka를 사용하고 소비자가 스파크 스트리밍을 사용하는 간단한 제작자 소비자를 작성하려고합니다.Kafka 및 Spark Streaming Simple Producer Consumer
스칼라의 생산자 코드 : 스칼라에서
import java.util.Properties
import org.apache.kafka.clients.producer._
object kafkaProducer {
def main(args: Array[String]) {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC = "test"
for (i <- 1 to 50) {
Thread.sleep(1000) //every 1 second
val record = new ProducerRecord(TOPIC, generator.getID().toString(),generator.getRandomValue().toString())
producer.send(record)
}
producer.close()
}
}
소비자 코드 :
val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1))
: 문제는 소비자의 코드 라인을 변경하여 해결
import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
import java.util.Properties
import kafka.producer._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
object kafkaConsumer {
def main(args: Array[String]) {
var totalCount = 0L
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AnyName").set("spark.driver.host", "localhost")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1))
stream.foreachRDD((rdd: RDD[_], time: Time) => {
val count = rdd.count()
println("\n-------------------")
println("Time: " + time)
println("-------------------")
println("Received " + count + " events\n")
totalCount += count
})
ssc.start()
Thread.sleep(20 * 1000)
ssc.stop()
if (totalCount > 0) {
println("PASSED")
} else {
println("FAILED")
}
}
}
나는 생산자와 소비자를 순차적으로 시작한다고 생각하십니까 ??? – nabongs
예, 저는 소비자가 실행되는 동안 소비자와 생산자를 시작합니다. – MennatAllahHany
콘솔 생산자가 생산자 코드를 테스트했고 콘솔 생산자가 소비자 코드를 테스트 했습니까? Kafka - Spark 통합이 까다로울 수 있습니다 ... –