카프카에서 구조화 된 스트리밍을하려고합니다. HDFS에 검사 점을 저장할 계획입니다. Spark 스트리밍을 위해 HDFS에 체크 포인트를 저장하지 않도록 권장하는 cloudera 블로그를 읽었습니다. 구조체 스트리밍 체크 포인트와 동일한 문제입니까? https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.카프카 스트럭처 체크 포인트
구조화 된 스트리밍에서 스파크 프로그램이 특정 시간 동안 작동하지 않는 경우 체크 포인트 디렉토리에서 최신 오프셋을 가져오고 그 오프셋 후에 데이터를로드하는 방법은 무엇입니까? 다음과 같이 디렉터리에 검사 점을 저장합니다.
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
업데이트 :
이 내 구조적 스트리밍 프로그램, 카프카 메시지를 읽고 압축 해제 HDFS에 기록합니다. 귀하의 요청에
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
당신이 블로그는 HDFS에 체크 포인트를 저장하지 않도록 권장하는 확실한가요? 꽤 이상 하네. 링크가 있습니까? 구조화 된 스트리밍 질문의 경우 동일한 체크 포인트 디렉토리를 사용하여 동일한 코드를 실행하면 구조화 된 스트리밍이 마지막 실패 오프셋을 선택하고 다시 시작합니다. – zsxwing
@zsxwing 이것은 cloudera 블로그 링크입니다. https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/ 스트리밍 프로그램을 수동으로 수동으로 종료했습니다. 다시 시작하고 메시지가 처리 된 후에 만 처리하기 시작했습니다. 그것은 다운되었을 때 누락 된 메시지를 무시하고 다시 처리하지 못했습니다. –
드라이버 로그를보고'logInfo (s "GetBatch start = $ start, end = $ end")'에 의해 출력 된 로그를 찾을 수 있습니까? 조회가 처리 한 내용을 알려 주어야합니다. – zsxwing