1

어떤 상황에서 Spark가 UDAF 기능의 일부로 병합을 수행하는지 알고 싶습니다.사용자 정의 집계 함수에서 병합은 언제 발생합니까?

동기 부여 : 내 Spark 프로젝트에서 많은 UDAF 기능을 사용하고 있습니다. 종종 다음과 같은 질문에 답하고 싶습니다 :

30 일 동안 현재 거래가 이루어진 국가에서 신용 카드 거래가 몇 번 있었습니까?

창이 현재 트랜잭션에서 시작되지만 카운트에 포함되지 않습니다. 지난 30 일 동안 어떤 국가를 셀 수 있는지 현재 거래의 가치가 필요합니다.

val rollingWindow = Window 
     .partitionBy(partitionByColumn) 
     .orderBy(orderByColumn.desc) 
     .rangeBetween(0, windowSize) 

df.withColumn(
    outputColumnName, 
    customUDAF(inputColumn, orderByColumn).over(rollingWindow)) 

저는 customUDAF를 작성하여 계산합니다. 항상 .orderBy(orderByColumn.desc)을 사용하고 .desc 덕분에 현재 트랜잭션이 계산 중에 창에서 첫 번째로 표시됩니다.

UDAF 함수는 병렬 계산에서 두 개의 중간 집계 버퍼를 병합하는 merge 함수의 구현이 필요합니다. 합병이 발생하면 내 current transaction은 다른 버퍼에서 동일하지 않을 수 있으며 UDAF의 결과는 올바르지 않습니다.

내 데이터 집합의 합병 횟수를 계산하고 현재 트랜잭션과 비교할 창에서 첫 번째 트랜잭션 만 유지하는 UDAF 함수를 작성했습니다.

class FirstUDAF() extends UserDefinedAggregateFunction { 

    def inputSchema = new StructType().add("x", StringType) 
    .add("y", StringType) 

    def bufferSchema = new StructType() 
    .add("first", StringType) 
    .add("numMerge", IntegerType) 

    def dataType = new StructType() 
    .add("firstCode", StringType) 
    .add("numMerge", IntegerType) 

    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
    buffer(0) = "" 
    buffer(1) = 1 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    if (buffer.getString(0) == "") 
     buffer(0) = input.getString(0) 

    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1) 
    } 

    def evaluate(buffer: Row) = buffer 
} 

저는 16 CPU와 로컬 마스터 스파크 2.0.1로 그것을 실행하면 윈도우의 모든 합병 및 첫 번째 트랜잭션이 결코 항상 현재의 트랜잭션 (transaction)이다. 이것이 내가 원하는거야. 가까운 미래에 x100의 더 큰 데이터 세트와 실제 배포 된 Spark 클러스터에서 코드를 실행하고 합병이 발생할 수 있는지 알고 싶습니다.

질문 : 환경/conditons 합병이 UDAF에서 개최

  • 하는?
  • Windows와 주문이 합병 한 적이 있습니까?
  • Spark에 합병을하지 말라고 말할 수 있습니까?

답변

1

UDAF에서 어떤 상황이 발생합니까? 집계 함수 ("지도 측 집계")의 일부 응용 프로그램은 셔플 후 병합 할 때

merge

는 ("측 집계를 감소")라고합니다.

주문과 Windows가 지금까지 합병 한 적이 있습니까?

현재 구현에서는입니다. 현재 윈도우 함수는 단지 groupByKey이며, 부분 집계는 없습니다. 이것은 물론 구현 세부 사항이며 추후 통지없이 변경 될 수 있습니다.

Spark에 합병을하지 말라고 할 수 있습니까?

아니요. 그러나 데이터가 이미 집계 키로 분할 된 경우 merge이 필요하지 않으며 combine 만 사용됩니다.

마지막 : 신용 카드 거래는 30 일 창에서 현재의 트랜잭션 (transaction)와 같은 나라에서 만들어진 몇 번

?

UDAFs 또는 창 기능을 호출하지 않습니다. 아마 o.a.s.sql.functions.window과 함께 번쩍이는 창을 만들고 사용자, 국가 및 창별로 집계하고 입력과 다시 결합 할 것입니다.

+0

해명 해 주셔서 감사합니다. 나는 너의 대답을 받아 들인다. 마지막으로 나는 내가 어떻게 할 것인지 잘 모르겠다. 정교하게 제발 주시겠습니까? 창별로 어떻게 집계합니까? 나는 사용자별로 파티션을 나누고, 현재 거래가 이루어진 국가 (창에 대한 현재, SQL의 current_row와 같은 현재)가 창에서 발생한 날짜 및 횟수 순으로 정렬합니다. 각 거래마다이 국가는 다릅니다. –