1

spark-shell 내부에서 spark 구조화 된 스트리밍을 통해 kafka 주제를 읽으려고하고 있지만 kafka에서 어떤 라인도 얻지 못하는 것처럼 보입니다.readStream kafka는 아무런 값도 얻지 못합니다.

카프카 혼자 잘 작동 (콘솔 소비자와 콘솔 프로듀서 테스트) : 나는 '

ds1 = spark 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:2181") 
    .option("subscribe", "testtopic") 
    .option("startingOffsets" , "earliest") 
    .load() 

ds1.writeStream.format("console").start 

: 이것은 스파크 쉘에서 실행 해요 코드가

~/opt/bd/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testtopic --from-beginning 
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. 
first 
thrid 
fifth 
seventh 
eight 
bla 
blal2 
das ist 
testmaschine 
hallo 
kleiner 
blsllslsd 

나는이 주제에 대해 이미 kafka에 저장된 메시지를 받고 모든 메시지가 spark 셸에 인쇄 될 것으로 기대하고 있습니다. 그러나 인쇄 된 것은 없습니다. 내 실수는 어디에 있습니까? Spark 2.0.2와 kafka 010.2를 사용하고 있습니다.

답변

1

카프카 부트 스트랩 서버의 포트를 변경해야합니다. 좋아요 -

ds1 = spark 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .option("subscribe", "testtopic") 
    .option("startingOffsets" , "earliest") 
    .load() 

ds1.writeStream.format("console").start 

그러면 readStream에서 값을 가져올 수 있습니다.

도움이 되었기를 바랍니다.

+1

이제 작동합니다. 고마워. 사육사 포트는 사용하지 말고 브로커에 지정된 포트를 사용해야합니다. – cronoik