많은 신호가있는 DataFrame이 있고이를지도로 변환하고 싶습니다. [String, List [String]]데이터 프레임을 스칼라/스파크의 [String, List [String]] 맵으로 변환하는 것을 어떻게 최적화 할 수 있습니까?
코드를 실행하고 있지만 실행하는 데 시간이 오래 걸리는 문제가 있습니다. . 100 개의 신호 만 들어 있으면 약 13 분이 필요합니다.
+----------+-----+
|SignalName|Value|
+----------+-----+
| S1| V1|
| S1| V2|
| S1| V3|
| S2| V1|
| S2| V2|
| S3| V1|
+----------+-----+
:
+----------+-----+
|SignalName|Value|
+----------+-----+
| S1| V1|
| S2| V1|
| S1| V2|
| S2| V2|
| S3| V1|
| S1| V3|
| S1| V1|
+----------+-----+
그럼 내가 중복
var reducedDF = inputDataFrame.select("SignalName","Value").dropDuplicates()
reduedDF.show의 OUPUT를 필터링 할 :
이것은 내가 처음에받은 inputDataFrame (예)입니다
다음 단계는 중복없이 SignalNames의 RDD를 얻는 것입니다. 나중에 zipWithIndex()를 사용했습니다. 왜냐하면 나중에 RDD의 모든 값을 읽고 싶기 때문입니다.
var RDDOfSignalNames = reducedDF.select("SignalName").rdd.map(r => r(0).asInstanceOf[String])
RDDOfSignalNames = RDDOfSignalNames.distinct()
val RDDwithIndex = RDDOfSignalNames.zipWithIndex()
val indexKey = RDDwithIndex.map { case (k, v) => (v, k) }
를 그리고 이제 마지막 단계는 유형 목록 [문자열]의 목록으로 모든 SignalName을 위해 가능한 모든 값을 얻을하고지도에 추가하는 것입니다 : 나는 다음과 같은 코드를 사용하여이 작업을 수행 할 수
결국var dataTmp: DataFrame = null
var signalname = Seq[String]("")
var map = scala.collection.mutable.Map[String, List[String]]()
for (i <- 0 to (RDDOfSignalNames.count()).toInt - 1) {
signalname = indexKey.lookup(i)
dataTmp = reducedDF.filter(data.col("Signalname").contains(signalname(0)))
map += (signalname(0) -> dataTmp.rdd.map(r => r(0).asInstanceOf[String]).collect().toList)
println(i+"/"+(RDDOfSignalNames.count().toInt - 1).toString())
}
이지도는 다음과 같습니다
scala.collection.mutable.Map[String,List[String]] = Map(S1 -> List(V1, V2, V3), S3 -> List(V1), S2 -> List(V1, V2))
문제는이 약 13 분 소요 (106 개) 신호의 라인지도 + = ...입니다! 이 작업을 수행하는보다 효율적인 방법이 있습니까?
입니다 정의되지 않은 함수 collect_list을; – LUIGI
'import org.apache.spark.sql.functions.collect_list'을 (를) 가져 왔습니까? –
예, 해봤지만 스파크 1.6.2를 사용합니다! 나는 – LUIGI