나는 완전하게 작동하는 2 차 정렬을 구현 한 스칼라 프로그램을 가지고있다. 나는 그 프로그램을 작성했습니다 방법은 다음과 같습니다Secondar Sorting in spark in clojure/flambo
object rfmc {
// Custom Key and partitioner
case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[RFMCKey]
k.cId.hashCode() % numPartitions
}
}
object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}
}
// The body of the code
//
//
val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))
}
나는 불꽃이라고 flambo에 대한 Clojure에서의 DSL을 사용하여 같은 일을 구현하고 싶었다. 나는 clojure를 사용하여 파티셔너를 작성할 수 없기 때문에 위의 코드를 다시 사용하여 컴파일하고 Clojure 코드의 종속성으로 사용했습니다.
(ns xyz
(:import
[package RFMCPartitioner]
[package RFMCKey]
)
)
하지만 (RFMCKey. cust_id r f m c)
을 수행하여 RFMCKey
을 만들려고 할 때, 그것은 다음과 같은 오류가 발생합니다 :
java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to java.lang.Comparable
at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170)
at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164)
at org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
을
는 이제 다음과 같은 방법으로 파티션 프로그램 내 Clojure의 코드에서 키를 가져입니다
내 생각 엔 그 파티션을 정의한 주문을 찾을 수 없다는 것입니다. 하지만 스칼라에서 작동한다면 Clojure에서 왜 작동하지 않는 걸까요?