2017-10-06 9 views
1

카프카에서 구조화 된 스트리밍을하려고합니다. HDFS에 검사 점을 저장할 계획입니다. Spark 스트리밍을 위해 HDFS에 체크 포인트를 저장하지 않도록 권장하는 cloudera 블로그를 읽었습니다. 구조체 스트리밍 체크 포인트와 동일한 문제입니까? https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.카프카 스트럭처 체크 포인트

구조화 된 스트리밍에서 스파크 프로그램이 특정 시간 동안 작동하지 않는 경우 체크 포인트 디렉토리에서 최신 오프셋을 가져오고 그 오프셋 후에 데이터를로드하는 방법은 무엇입니까? 다음과 같이 디렉터리에 검사 점을 저장합니다.

df.writeStream\ 
     .format("text")\ 
     .option("path", '\files') \ 
     .option("checkpointLocation", 'checkpoints\chkpt') \ 
     .start() 

업데이트 :

이 내 구조적 스트리밍 프로그램, 카프카 메시지를 읽고 압축 해제 HDFS에 기록합니다. 귀하의 요청에

df = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", KafkaServer) \ 
     .option("subscribe", KafkaTopics) \ 
     .option("failOnDataLoss", "false")\ 
     .load() 
Transaction_DF = df.selectExpr("CAST(value AS STRING)") 
Transaction_DF.printSchema() 

decomp = Transaction_DF.select(zip_extract("value").alias("decompress")) 
#zip_extract is a UDF to decompress the stream 

query = decomp.writeStream\ 
    .format("text")\ 
    .option("path", \Data_directory_inHDFS) \ 
    .option("checkpointLocation", \pathinDHFS\) \ 
    .start() 

query.awaitTermination() 
+0

당신이 블로그는 HDFS에 체크 포인트를 저장하지 않도록 권장하는 확실한가요? 꽤 이상 하네. 링크가 있습니까? 구조화 된 스트리밍 질문의 경우 동일한 체크 포인트 디렉토리를 사용하여 동일한 코드를 실행하면 구조화 된 스트리밍이 마지막 실패 오프셋을 선택하고 다시 시작합니다. – zsxwing

+0

@zsxwing 이것은 cloudera 블로그 링크입니다. https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/ 스트리밍 프로그램을 수동으로 수동으로 종료했습니다. 다시 시작하고 메시지가 처리 된 후에 만 ​​처리하기 시작했습니다. 그것은 다운되었을 때 누락 된 메시지를 무시하고 다시 처리하지 못했습니다. –

+0

드라이버 로그를보고'logInfo (s "GetBatch start = $ start, end = $ end")'에 의해 출력 된 로그를 찾을 수 있습니까? 조회가 처리 한 내용을 알려 주어야합니다. – zsxwing

답변

0

는 마루과 같은 몇 가지 형식으로 HDFS와 같은 일부 영구 저장소에 결과를 기록하는 동안 체크 포인트를 적용 해보십시오. 그것은 나를 위해 잘 작동했습니다.

코드를 공유하여 더 자세히 살펴볼 수 있습니까?

+0

질문에 업데이트로 전체 코드를 추가했습니다. 검사 점 파일에서 어떻게 최신 오프셋을 얻었습니까? –

+0

그렇게하면서 HDFS의 체크 포인트가 시간이 지남에 따라 더 많은 스토리지를 사용하는 것을 어떻게 막을 수 있습니까? 관리 할 때 사용할 수있는 "정리"구성이 있습니까? –

2

장기 저장소 (HDFS, AWS S3 등)에 Checkpoint를 저장하는 것이 가장 좋습니다. 나는 "failOnDataLoss"속성이 모범 사례가 아니므로 false로 설정해서는 안된다는 점을 여기에 하나 추가하고 싶습니다. 데이터 손실은 누구도 감당할 수없는 어떤 것입니다. 당신은 올바른 길에 있습니다.

+0

그렇게하면서 HDFS의 체크 포인트가 시간이 지남에 따라 더 많은 스토리지를 사용하는 것을 어떻게 막을 수 있습니까? 관리 할 때 사용할 수있는 "정리"구성이 있습니까? –

+0

내가 아는 한, 체크 포인트는 많은 데이터를 저장하지 않는다. 카프카처럼 오프셋을 저장하므로 저장소 문제에 대해 걱정할 필요가 없다. 체크 포인트를 지우고 싶은 경우에는 유지 관리 중에 수행하거나 넣을 수있다. 그것을위한 스케줄러. –

+0

SparkConf에서 "spark.cleaner.referenceTracking.cleanCheckpoints", "true"를 사용하고 있습니다. 깨끗한 체크 포인트로 작동합니다. –

0

나는 인위적으로 Hbase, Kafka, HDFS 또는 Zookeeper에서 오프셋 관리를 유지하는 것이 좋습니다. HDFS는 높은 대기 시간이로

는 "그것은 당신이 또한 HDFS와 같은 저장 시스템에서 오프셋을 저장할 수 있다는 언급 할 가치가있다. HDFS에서 오프셋을 저장하면 다른 에 비해 위의 옵션에 비해 덜 대중적인 접근 방식 입니다 ZooKeeper와 HBase 같은 시스템 "이라고 말했다.

어떻게에서 기존의 체크 포인트에서 쿼리를 다시 시작 스파크 문서에서 찾을 수 있습니다 http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing