Spark 2.2에서 Spark Structured 스트리밍을 사용하여 HDFS 디렉토리에서 Kafka 토픽으로 파일을 스트리밍합니다. 내가 주제에 쓰고있는 데이터에 대한 카프카 오프셋을 포착하고 싶습니다.Spark Structured Streaming으로 작성시 카프카 오프셋 캡처
나는 카프카에 쓰려면
val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer"))
.option("topic", Config().getString(domain + ".kafkaTopic"))
.start()
을 사용하고 있습니다.
내가
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
스트림의 진행 정보 오프셋은 카프카에서 생성되는과 상관없는 검색 정보를 캡처하는 데 사용합니다.
스트림에서 제공하는 정보가 실제로 내가 사용하고 있으며 카프카 (Kafka)로 작성된 것과 관련이없는 파일 스트림에 대한 것이기 때문에 이것이 원인이라고 생각합니다.
Kafka에 쓸 때 생성되는 오프셋 정보를 캡처하는 방법은 Spark Structure Streaming과 함께 있습니까?
추가 예 : 난 그냥 항목을 생성 한 후 세 개의 행과 소스 1 데이터를 실행하면 내가 얻을 :
실행 1 : 오프셋 시작 : 널 (null), 종료 오프셋 : { "logOffset"0} 시작 오프셋 : { "logOffset": 0}, 종료 오프셋 : { "logOffset": 0}
Kafka Says:
ruwe:2:1
ruwe:1:1
ruwe:0:1
실행 2;
Start Offset: {"logOffset":0}, End offset: {"logOffset":1}
Start Offset: {"logOffset":1}, End offset: {"logOffset":1}
Kafka Says:
ruwe:2:2
ruwe:1:2
ruwe:0:2
실행 3 : I는 다른 소스에서 같은 프로그램에 데이터를 실행하고이 그 불꽃을 기반으로 정보를보고 있음을 나타냅니다
Start Offset: null, End offset: {"logOffset":0}
Start Offset: {"logOffset":0}, End offset: {"logOffset":0}
and of course Kafka continued to increment
을받은
Start Offset: {"logOffset":1}, End offset: {"logOffset":2}
Start Offset: {"logOffset":2}, End offset: {"logOffset":2}
Kafka Says:
ruwe:2:3
ruwe:1:3
ruwe:0:3
출처 :
대상에 무엇이 생성되었는지 알고 싶습니다.
이것은 대상 (카프카)이 아닌 소스에 대한 역방향 오프셋을보고 한 것으로 보입니다. 같은 소스에서 계속 실행하면 숫자가 올라가지만 두 번째 소스의 데이터를 실행하면 처음 숫자가보고되어 Kafka 오프셋이 아니라고 표시됩니다. – SRuwe
@SRuwe 타겟 오프셋을 원하면'SinkProgress.json'을 살펴보십시오. –