2016-07-05 6 views
0

나는 완전하게 작동하는 2 차 정렬을 구현 한 스칼라 프로그램을 가지고있다. 나는 그 프로그램을 작성했습니다 방법은 다음과 같습니다Secondar Sorting in spark in clojure/flambo

object rfmc { 
    // Custom Key and partitioner 

    case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double) 
    class RFMCPartitioner(partitions: Int) extends Partitioner { 
    require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.") 
    override def numPartitions: Int = partitions 
    override def getPartition(key: Any): Int = { 
     val k = key.asInstanceOf[RFMCKey] 
     k.cId.hashCode() % numPartitions 
    } 
    } 
    object RFMCKey { 
    implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = { 
     Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1)) 
    } 
    } 
    // The body of the code 
    // 
    // 
    val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c) 
    val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1)) 
} 

나는 불꽃이라고 flambo에 대한 Clojure에서의 DSL을 사용하여 같은 일을 구현하고 싶었다. 나는 clojure를 사용하여 파티셔너를 작성할 수 없기 때문에 위의 코드를 다시 사용하여 컴파일하고 Clojure 코드의 종속성으로 사용했습니다.

(ns xyz 
    (:import 
    [package RFMCPartitioner] 
    [package RFMCKey] 
    ) 
) 

하지만 (RFMCKey. cust_id r f m c)을 수행하여 RFMCKey을 만들려고 할 때, 그것은 다음과 같은 오류가 발생합니다 :

java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to java.lang.Comparable 
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28) 
    at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153) 
    at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170) 
    at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164) 
    at org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252) 
    at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110) 
    at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) 
    at org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83) 
    at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687) 
    at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705) 
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64) 
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

는 이제 다음과 같은 방법으로 파티션 프로그램 내 Clojure의 코드에서 키를 가져입니다

내 생각 엔 그 파티션을 정의한 주문을 찾을 수 없다는 것입니다. 하지만 스칼라에서 작동한다면 Clojure에서 왜 작동하지 않는 걸까요?

답변

0

그래서 나는 마침내 그것을 알아 냈습니다. 기본적으로 커스텀 주문 함수를 별도의 스칼라 프로젝트로 작성한 다음이를 클로저에 호출해야했습니다.

내 스칼라 파일은 다음과 같은 방법으로 기입했다 :

import org.apache.spark.Partitioner 
import org.apache.spark.rdd.RDD 

case class RFMCKey(cId: String, R: Double, F: Long, M: Double, C: Double) 
class RFMCPartitioner(partitions: Int) extends Partitioner { 
    require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.") 
    override def numPartitions: Int = partitions 
    override def getPartition(key: Any): Int = { 
    val k = key.asInstanceOf[RFMCKey] 
    k.cId.hashCode() % numPartitions 
    } 
} 
object RFMCKey { 
    implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = { 
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1)) 
    } 
} 

class rfmcSort { 
    def sortWithRFMC(a: RDD[(String, (((Double, Long), Double), Double))], parts: Int): RDD[(RFMCKey, String)] = { 
    val x = a.map(v => v match { 
       case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal) 
      }).repartitionAndSortWithinPartitions(new RFMCPartitioner(parts)) 
    x 
    } 
} 

내가 ascala 프로젝트로 컴파일하고 내 Clojure의 코드에서이 방법을 사용 :

(:import [org.formcept.wisdom rfmcSort] 
     [org.apache.spark.rdd.RDD]) 

sorted-rfmc-records (.toJavaRDD (.sortWithRFMC (rfmcSort.) (.rdd rfmc-records) num_partitions)) 

내가 길을 발견하세요 내가 만든 rfmcSort 개체에서 sortWithRFMC 함수를 호출합니다. 또한 여기서 중요한 점은 JavaPairRDD을 스칼라 함수에 전달할 때 .rdd 메서드를 호출하여 먼저 spark RDD으로 변환해야한다는 것입니다. 그리고 클로저에서 함께 작업하려면 spark RDDJavaPairRDD으로 다시 변환해야합니다.