2017-11-27 4 views
0

하이브 테이블에서 델타를 Kafka로 게시하려고합니다. 문제의 테이블은 단일 파티션, 244 MB의 단일 블록 파일입니다. 우리 클러스터는 256M 블록 크기로 구성되어 있으므로이 경우 단일 파일의 최대 용량에 가깝습니다.Spark Dataframe leftanti Join Fail

테이블이 업데이트 될 때마다 복사본이 보관 된 다음 델타 프로세스가 실행됩니다.

아래의 함수에서 서로 다른 조인을 분리하여 내부 조인이 수용 가능하도록 (약 3 분) 수행되지만 두 개의 antijoin 데이터 프레임이 완료되지 않음을 확인했습니다 - 우리는 계속 스파크 작업에 더 많은 리소스를 던지고, 그러나 아래의 오류를 계속보고 있습니다.

이러한 종류의 결합에 대한 데이터 프레임 크기에 실질적인 제한이 있습니까?

private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset) 
    extends BasePublisher(spark, sink, source) with Serializable { 

    val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object 

    def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = { 

     val current = spark.read.parquet(dataLocation) 
     val previous = spark.read.parquet(archiveLocation) 

     val inserts = current.join(previous, keys, "leftanti") 
     val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn)) 
     val deletes = previous.join(current, keys, "leftanti") 

     val upsertCounter = spark.sparkContext.longAccumulator("upserts") 
     val deleteCounter = spark.sparkContext.longAccumulator("deletes") 

     logInfo("sending inserts to kafka") 
     sink.sendDeltasToKafka(inserts, "U", upsertCounter) 

     logInfo("sending updates to kafka") 
     sink.sendDeltasToKafka(updates, "U", upsertCounter) 

     logInfo("sending deletes to kafka") 
     sink.sendDeltasToKafka(deletes, "D", deleteCounter) 

     (upsertCounter.value, deleteCounter.value) 
    } 
    } 

우리가보고있는 오류는 드라이버가 실행 프로그램과의 접촉을 잃어 가고 있음을 나타내는 것으로 보인다. 우리는 실행 프로그램 메모리를 최대 24G까지 늘렸고 네트워크 시간 초과는 900 초, 하트 비트 간격은 최대 120 초로 늘 렸습니다. 로그에 나중에

17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval 
    at ... 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
    at ... 

는 :

17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts 
org.apache.spark.SparkException: Exception thrown in awaitResult 
    at ... 
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver. 

우리는 (성공없이) 조작 된 설정 스위치는 --executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s

답변

0

내가 생각하지 못했습니다 옵션은 내 드라이버 자원을 증가시키는 것이다있다. --driver-memory 4G--driver-cores 2을 추가하고 약 9 분 만에 작업을 완료했습니다.

이 두 파일의 내부 조인 (또는 내장 된 except() 메서드 사용)은 실행 프로그램에 메모리 부담을 가하는 것으로 보입니다. 키 열 중 하나에서 파티션을 나누면 메모리 사용량이 줄어들지 만 더 많은 시간이 소요되기 때문에 전체 시간이 늘어납니다.

이 두 파일간에 왼쪽/반대 결합을 수행하려면 더 많은 드라이버 리소스가 필요합니다. 기대하지 않았어.