2017-12-06 7 views
0

많은 신호가있는 DataFrame이 있고이를지도로 변환하고 싶습니다. [String, List [String]]데이터 프레임을 스칼라/스파크의 [String, List [String]] 맵으로 변환하는 것을 어떻게 최적화 할 수 있습니까?

코드를 실행하고 있지만 실행하는 데 시간이 오래 걸리는 문제가 있습니다. . 100 개의 신호 만 들어 있으면 약 13 분이 필요합니다.

+----------+-----+ 
|SignalName|Value| 
+----------+-----+ 
|  S1| V1| 
|  S1| V2| 
|  S1| V3| 
|  S2| V1| 
|  S2| V2| 
|  S3| V1| 
+----------+-----+ 
:

+----------+-----+ 
|SignalName|Value| 
+----------+-----+ 
|  S1| V1| 
|  S2| V1| 
|  S1| V2| 
|  S2| V2| 
|  S3| V1| 
|  S1| V3| 
|  S1| V1| 
+----------+-----+ 

그럼 내가 중복

var reducedDF = inputDataFrame.select("SignalName","Value").dropDuplicates() 

reduedDF.show의 OUPUT를 필터링 할 :

이것은 내가 처음에받은 inputDataFrame (예)입니다

다음 단계는 중복없이 SignalNames의 RDD를 얻는 것입니다. 나중에 zipWithIndex()를 사용했습니다. 왜냐하면 나중에 RDD의 모든 값을 읽고 싶기 때문입니다.

var RDDOfSignalNames = reducedDF.select("SignalName").rdd.map(r => r(0).asInstanceOf[String]) 
RDDOfSignalNames = RDDOfSignalNames.distinct() 
val RDDwithIndex = RDDOfSignalNames.zipWithIndex() 
val indexKey = RDDwithIndex.map { case (k, v) => (v, k) } 

를 그리고 이제 마지막 단계는 유형 목록 [문자열]의 목록으로 모든 SignalName을 위해 가능한 모든 값을 얻을하고지도에 추가하는 것입니다 : 나는 다음과 같은 코드를 사용하여이 작업을 수행 할 수

결국
var dataTmp: DataFrame = null 
var signalname = Seq[String]("") 
var map = scala.collection.mutable.Map[String, List[String]]() 

for (i <- 0 to (RDDOfSignalNames.count()).toInt - 1) { 

    signalname = indexKey.lookup(i) 

    dataTmp = reducedDF.filter(data.col("Signalname").contains(signalname(0)))   

    map += (signalname(0) -> dataTmp.rdd.map(r => r(0).asInstanceOf[String]).collect().toList) 
    println(i+"/"+(RDDOfSignalNames.count().toInt - 1).toString()) 

} 

이지도는 다음과 같습니다

scala.collection.mutable.Map[String,List[String]] = Map(S1 -> List(V1, V2, V3), S3 -> List(V1), S2 -> List(V1, V2)) 

문제는이 약 13 분 소요 (106 개) 신호의 라인지도 + = ...입니다! 이 작업을 수행하는보다 효율적인 방법이 있습니까?

답변

1

우선 varscala에 사용하지 않는 것이 좋습니다. 항상 불변의 변수을 사용해보십시오. 그래서

val reducedDF = inputDataFrame.select("SignalName","Value").distinct() 

에 다음 행

var reducedDF = inputDataFrame.select("SignalName","Value").dropDuplicates() 

을 변경하는 것이 바람직하다.

그리고, 당신이 원하는 결과를 얻기 위해 이러한 복잡성을 통해 갈 필요가 없습니다

. 당신은 원하는 출력은 다음과

import org.apache.spark.sql.functions.collect_list 
reducedDF 
     .groupBy("SignalName") 
     .agg(collect_list($"Value").as("Value")) 
     .rdd 
     .map(row => (row(0).toString -> row(1).asInstanceOf[scala.collection.mutable.WrappedArray[String]].toList)) 
     .collectAsMap() 

,
reducedDF.groupBy("SignalName").agg(collect_list($"Value").as("Value")) 당신에게 제공하고 dataframe.rdd.map(row => (row(0).toString -> row(1).asInstanceOf[scala.collection.mutable.WrappedArray[String]].toList)).collectAsMap() 그냥 원하는 출력 Mapdataframe 변환되는 코드의 나머지

+----------+------------+ 
|SignalName|Value  | 
+----------+------------+ 
|S3  |[V1]  | 
|S2  |[V2, V1] | 
|S1  |[V1, V2, V3]| 
+----------+------------+ 

로를 얻을 수 있습니다.스레드 "주요"org.apache.spark.sql.AnalysisException에서 예외 :

최종지도 출력 내가 오류 메시지가있어 지금

Map(S1 -> List(V1, V2, V3), S3 -> List(V1), S2 -> List(V2, V1)) 
+0

입니다 정의되지 않은 함수 collect_list을; – LUIGI

+0

'import org.apache.spark.sql.functions.collect_list'을 (를) 가져 왔습니까? –

+0

예, 해봤지만 스파크 1.6.2를 사용합니다! 나는 – LUIGI