2017-11-13 4 views
3

Spark 2.2에서 Spark Structured 스트리밍을 사용하여 HDFS 디렉토리에서 Kafka 토픽으로 파일을 스트리밍합니다. 내가 주제에 쓰고있는 데이터에 대한 카프카 오프셋을 포착하고 싶습니다.Spark Structured Streaming으로 작성시 카프카 오프셋 캡처

나는 카프카에 쓰려면

val write = jsonDF 
.writeStream.format("kafka") 
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint")) 
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer")) 
.option("topic", Config().getString(domain + ".kafkaTopic")) 
.start() 

을 사용하고 있습니다.

내가

spark.streams.addListener(new StreamingQueryListener() { 
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { 
    println("Query started: " + queryStarted.id) 
    } 
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { 
    println("Query terminated: " + queryTerminated.id) 
    } 
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { 
    println("Query made progress: " + queryProgress.progress) 
    } 
}) 

스트림의 진행 정보 오프셋은 카프카에서 생성되는과 상관없는 검색 정보를 캡처하는 데 사용합니다.

스트림에서 제공하는 정보가 실제로 내가 사용하고 있으며 카프카 (Kafka)로 작성된 것과 관련이없는 파일 스트림에 대한 것이기 때문에 이것이 원인이라고 생각합니다.

Kafka에 쓸 때 생성되는 오프셋 정보를 캡처하는 방법은 Spark Structure Streaming과 함께 있습니까?

추가 예 : 난 그냥 항목을 생성 한 후 세 개의 행과 소스 1 데이터를 실행하면 내가 얻을 :
실행 1 : 오프셋 시작 : 널 (null), 종료 오프셋 : { "logOffset"0} 시작 오프셋 : { "logOffset": 0}, 종료 오프셋 : { "logOffset": 0}

Kafka Says: 
ruwe:2:1 
ruwe:1:1 
ruwe:0:1 

실행 2;

Start Offset: {"logOffset":0}, End offset: {"logOffset":1} 
    Start Offset: {"logOffset":1}, End offset: {"logOffset":1} 

Kafka Says: 
ruwe:2:2 
ruwe:1:2 
ruwe:0:2 

실행 3 : I는 다른 소스에서 같은 프로그램에 데이터를 실행하고이 그 불꽃을 기반으로 정보를보고 있음을 나타냅니다

Start Offset: null, End offset: {"logOffset":0} 
    Start Offset: {"logOffset":0}, End offset: {"logOffset":0} 

    and of course Kafka continued to increment 

을받은

Start Offset: {"logOffset":1}, End offset: {"logOffset":2} 
    Start Offset: {"logOffset":2}, End offset: {"logOffset":2} 

Kafka Says: 
ruwe:2:3 
ruwe:1:3 
ruwe:0:3 

출처 :

대상에 무엇이 생성되었는지 알고 싶습니다.

답변

1

카프카에 쓸 때 생성되는 오프셋 정보 정보를 캡처하는 방법은 Spark Structure Streaming과 함께 있습니까?

onQueryProgress에, 당신은 Array[SourceProgress]StreamingQueryProgress.sources 볼 필요가있다.

sparkSession.streams.addListener(new StreamingQueryListener {override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = ??? 

    override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { 
    val source = event.progress.sources.headOption 
    source.map(src => println(s"Start Offset: ${src.startOffset}, End offset: ${src.endOffset}")) 
    } 

    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit =() 
}) 

를 JSON은 다음과 같은 구조를 가지고 : 특히 카프카 KafkaWriter 스파크 구조 스트리밍에 대한 코드를 읽은 후

"startOffset" : { 
    "topic-name" : { 
    "0" : 1, 
    "1" : 22, 
    "2" : 419, 
    } 
}, 
"endOffset" : { 
    "topic-name" : { 
    "0" : 10, 
    "1" : 100, 
    "2" : 1000 
    } 
} 
+0

이것은 대상 (카프카)이 아닌 소스에 대한 역방향 오프셋을보고 한 것으로 보입니다. 같은 소스에서 계속 실행하면 숫자가 올라가지만 두 번째 소스의 데이터를 실행하면 처음 숫자가보고되어 Kafka 오프셋이 아니라고 표시됩니다. – SRuwe

+0

@SRuwe 타겟 오프셋을 원하면'SinkProgress.json'을 살펴보십시오. –

0

을 그것은 당신이 구문 분석 할 수 JSONs는 두 개의 문자열, startOffsetendOffset을 가지고 , KafkaWriteTask 및 CachedKafkaProducer의 경우 Spark는 콜백에서 KafkaProducer에서 반환되는 오프셋을 사용하지 않습니다. 정의한 콜백은 예외 만 캡처합니다.이것을 바탕으로 현재 릴리스 2.2에서는 수행 할 수 없다고 말할 것입니다.

그들이 제공하는 정보는 모두 대상이 아닌 검색어의 출처입니다.