2017-02-17 9 views
1

Apache Accark에서 사용자 정의 누산기를 사용하여 세트에 누적하려고합니다. 결과에는 Set [String] 유형이 있어야합니다. 이것에 대한 나는 사용자 정의 accumulator를 creat :사용자 정의 집합 누산기를 만드는 방법, 즉 [문자열]을 설정 하시겠습니까?

object SetAccumulatorParam extends AccumulatorParam[Set[String]] { 
    def addInPlace(r1: mutable.Set[String], r2: mutable.Set[String]): mutable.Set[String] = { 
     r1 ++= r2 
    } 

    def zero(initialValue: mutable.Set[String]): mutable.Set[String] = { 
     Set() 
    } 
} 

그러나이 유형의 변수를 인스턴스화 할 수 없습니다.

val tags = sc.accumulator(Set(""))(SetAccumulatorParam) 

오류가 발생합니다. 도움을 청하십시오. 1.6

required: org.apache.spark.AccumulatorParam[Set[String]] 
+0

은 공식 문서 (HTTP 상당히 다른 보인다. org/docs/latest/programming-guide.html # accumulators). 또한 Spark이이 누적기를 인스턴스화하려고한다고 가정 할 때 여기 개체의 사용법에 대해 회의적입니다. – LiMuBei

답변

1

업데이트 : 스파크 2.0

object StringSetAccumulatorParam extends AccumulatorParam[Set[String]] { 
    def zero(initialValue: Set[String]): Set[String] = { Set() } 
    def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = { s1 ++ s2 } 
} 

val stringSetAccum = sc.accumulator(Set[String]())(StringSetAccumulatorParam) 
sc.parallelize(Array("1", "2", "3", "1")).foreach(s => stringSetAccum += Set(s)) 
stringSetAccum.value.toString 
res0: String = Set(2, 3, 1) 

당신은 아마 당신이 고유 한 값을 걱정하는 경우, 당신은 확인하고 그렇지 않은 경우에만 추가 할 수 있습니다 (기존 collectionAccumulator을 사용으로 괜찮아요) 존재 :

val collAcc = spark.sparkContext.collectionAccumulator[String]("myCollAcc") 
collAcc: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 32154, name: Some(myCollAcc), value: []) 

spark.sparkContext.parallelize(Array("1", "2", "3")).foreach(s => collAcc.add(s)) 

collAcc.value.toString 
res0: String = [3, 2, 1] 

상세 정보 : 트라이의에 추가 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2

0

대답, 여기에 일반적인 경우입니다 SetAccumulator for spark 2.x.

import org.apache.spark.util.AccumulatorV2 

class SetAccumulator[T](var value: Set[T]) extends AccumulatorV2[T, Set[T]] { 
    def this() = this(Set.empty[T]) 
    override def isZero: Boolean = value.isEmpty 
    override def copy(): AccumulatorV2[T, Set[T]] = new SetAccumulator[T](value) 
    override def reset(): Unit = value = Set.empty[T] 
    override def add(v: T): Unit = value = value + v 
    override def merge(other: AccumulatorV2[T, Set[T]]): Unit = value = value ++ other.value 
} 

그리고 당신은 다음과 같이 사용할 수 있습니다 :

출력
val accum = new SetAccumulator[String]() 
spark.sparkContext.register(accum, "My Accum") // Optional, name it for SparkUI 

spark.sparkContext.parallelize(Seq("a", "b", "a", "b", "c")).foreach(s => accum.add(s)) 

accum.value 

: //spark.apache : 당신이 무슨 짓을

Set[String] = Set(a, b, c)