2016-06-07 3 views
0

나는 RDD [행]을 가지고있다 dataframe에 RDD [행] 변환스파크 : 행의 열 중 하나가 각 행에 대해 다음과 같은 데이터 목록

[guid, List(peopleObjects)] 
["123", List(peopleObjects1, peopleObjects2, peopleObjects3)] 
나는이를 변환 할

dataframe
내 스키마 대신 StringType에 대해 서로 다른 데이터 유형을 사용하여 I이어야한다 다음 코드

val personStructureType = new StructType() 
    .add(StructField("guid", StringType, true)) 
    .add(StructField("personList", StringType, true)) 
val personDF = hiveContext.createDataFrame(personRDD, personStructureType) 

을 사용하고?

내 목록이 작동 단지 문자열 인 경우

하지만이 목록 나는 다음과 같은 오류 그것은 당신이 뭘 하려는지 전혀 분명하지 않다

scala.MatchError: List(personObject1, personObject2, personObject3) (of class scala.collection.immutable.$colon$colon) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 
    at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445) 
    at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

어떤 종류의'peopleObject'입니까? 그것이 '사례 클래스'라면 그 정의를 포함 할 수 있습니까? 당신의'RDD'를 만드는 샘플 코드가 더 좋을 것입니다. –

답변

2

하지만 당신이 무엇을 할 수있는 더 좋은 방법을 얻을 때 그러면 case class을 만들고 RDD 행을 case class에 매핑 한 다음 toDF으로 전화하면됩니다. 같은

뭔가 :

case class MyClass(guid: Int, peopleObjects: List[String]) 

val rdd = sc.parallelize(Array((123,List("a","b")),(1232,List("b","d")))) 

val df = rdd.map(r => MyClass(r._1, r._2)).toDF 
df.show 
+----+-------------+ 
|guid|peopleObjects| 
+----+-------------+ 
| 123|  [a, b]| 
|1232|  [b, d]| 
+----+-------------+ 

또는 당신은 그것을 오랫동안 손 방법을 할 수 있지만이 같은 경우 클래스를 사용하지 않고 :

val df = sqlContext.createDataFrame(
    rdd.map(r => Row(r._1, r._2)), 
    StructType(Array(
    StructField("guid",IntegerType), 
    StructField("peopleObjects", ArrayType(StringType)) 
)) 
) 
+0

데이비드에게 감사드립니다. 이것은 제가 약간의 이해를 얻는데 도움이되었습니다. peopleObjects 클래스에는 name 및 location과 같은 속성이 있습니다. 유형으로 peopleObjects의 목록을 데이터 프레임에 전달할 수 있기를 바랬습니다. 그렇게하면 필자가 최종 출력 형식을 만들 때 개체를 분해하고 다시 작성할 필요가 없습니다. 지금 당장 peopleObject와 연관된 guid가 있습니다. guid에서 groupbykey를 사용하여 peopleObject의 목록을 가져옵니다. 나는 많은 테이블에서 같은 행동을 한 다음 guid를 통해 모두 참여할 계획이다. 그런 다음 특정 형식으로 최종 출력물을 만듭니다. –

+0

지금 막 json 객체를 만들고 문자열로 전달하고 모든 조인을 수행합니다. 그런 다음 객체를 다시 빌드하고 json을 수정하고 최종 출력을 만듭니다. –

+0

이것은 정확하고 유용한 답변입니다. @ 존 엔젤 하트 당신은 그것을 받아 들여야합니다. – Sim