어떤 상황에서 Spark가 UDAF 기능의 일부로 병합을 수행하는지 알고 싶습니다.사용자 정의 집계 함수에서 병합은 언제 발생합니까?
동기 부여 : 내 Spark 프로젝트에서 많은 UDAF 기능을 사용하고 있습니다. 종종 다음과 같은 질문에 답하고 싶습니다 :
30 일 동안 현재 거래가 이루어진 국가에서 신용 카드 거래가 몇 번 있었습니까?
창이 현재 트랜잭션에서 시작되지만 카운트에 포함되지 않습니다. 지난 30 일 동안 어떤 국가를 셀 수 있는지 현재 거래의 가치가 필요합니다.
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
저는 customUDAF를 작성하여 계산합니다. 항상 .orderBy(orderByColumn.desc)
을 사용하고 .desc
덕분에 현재 트랜잭션이 계산 중에 창에서 첫 번째로 표시됩니다.
UDAF 함수는 병렬 계산에서 두 개의 중간 집계 버퍼를 병합하는 merge
함수의 구현이 필요합니다. 합병이 발생하면 내 current transaction
은 다른 버퍼에서 동일하지 않을 수 있으며 UDAF의 결과는 올바르지 않습니다.
내 데이터 집합의 합병 횟수를 계산하고 현재 트랜잭션과 비교할 창에서 첫 번째 트랜잭션 만 유지하는 UDAF 함수를 작성했습니다.
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = ""
buffer(1) = 1
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.getString(0) == "")
buffer(0) = input.getString(0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
def evaluate(buffer: Row) = buffer
}
저는 16 CPU와 로컬 마스터 스파크 2.0.1로 그것을 실행하면 윈도우의 모든 합병 및 첫 번째 트랜잭션이 결코 항상 현재의 트랜잭션 (transaction)이다. 이것이 내가 원하는거야. 가까운 미래에 x100의 더 큰 데이터 세트와 실제 배포 된 Spark 클러스터에서 코드를 실행하고 합병이 발생할 수 있는지 알고 싶습니다.
질문 : 환경/conditons 합병이 UDAF에서 개최
- 하는?
- Windows와 주문이 합병 한 적이 있습니까?
- Spark에 합병을하지 말라고 말할 수 있습니까?
해명 해 주셔서 감사합니다. 나는 너의 대답을 받아 들인다. 마지막으로 나는 내가 어떻게 할 것인지 잘 모르겠다. 정교하게 제발 주시겠습니까? 창별로 어떻게 집계합니까? 나는 사용자별로 파티션을 나누고, 현재 거래가 이루어진 국가 (창에 대한 현재, SQL의 current_row와 같은 현재)가 창에서 발생한 날짜 및 횟수 순으로 정렬합니다. 각 거래마다이 국가는 다릅니다. –