2017-05-01 2 views
0

kafka에서 spark 스트리밍 응용 프로그램으로 CSV 파일을 보내려고하고 있는데 어떻게해야할지 모르겠습니다. 나는 많은 게시물을 여기에서 읽었다. 그러나 아무도 나를 도왔다.CSV를 Kafka에서 Spark Streaming으로 보내기

내 kafka 제작자가 csv를 보내고 나중에 응용 프로그램 (소비자)에서 분할하고 싶지만 중요하지 않습니다. 나는 RDD를 만들고 스파크로 보내려고했다. 정상적인 문자열 메시지에서 작동하지만 csv이 아닙니다.

message =sc.textFile("/home/guest/host/Seeds.csv")  
producer.send('test', message) 

그리고 내 스파크 소비자 :

내 프로듀서

ssc = StreamingContext(sc, 5) 

kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {'test': 1}) data = kvs.map(lambda x: x[1]) counts = data.flatMap(lambda line: line.split(";")) \

.map(lambda word: (word, 1)) \ 
.reduceByKey(lambda a, b: a+b) 

문제는 CSV를 전송하여, 돈을 streamming 불꽃이다 어떤 이벤트가 발생하지 않습니다. 다른 사람들이 형식이나 개념을 도울 수 있습니까?

도커 컨테이너 아래에서 파이썬이있는 노트북에서 프로듀서와 소비자를 실행 중입니다.

감사합니다.

답변

0

생산자의 메시지는 느리게 평가되는 RDD (클러스터를 통해 배포되는 csv 파일 행 모음)입니다. 즉, 작업을 수행 할 때까지 아무것도 수행하지 않습니다. 따라서 카프카 (Kafka)로 보내기 전에 RDD를 수집해야합니다. 아래 링크를 참조하십시오. how to properly use pyspark to send data to kafka broker?