2015-01-13 3 views
5

이 json 파일을 하이브 테이블로 읽으려고합니다. 최상위 키 즉 1,2 .. 여기서 일관되지 않습니다.hive/sql 및 spark로 json 키 - 값 읽기

{ 
    "1":"{\"time\":1421169633384,\"reading1\":130.875969,\"reading2\":227.138275}", 
    "2":"{\"time\":1421169646476,\"reading1\":131.240628,\"reading2\":226.810211}", 
    "position": 0 
} 

열은 위치를 무시하므로 내 하이브 테이블에서 시간과 판독 값 1,2 만 있으면됩니다. 하이브 쿼리와 스파크 맵 콤보 코드 조합을 수행 할 수도 있습니다. 도움 주셔서 감사합니다.

업데이트, 여기에 내가 그것은 다음과 같은 오류가 발생합니다

val hqlContext = new HiveContext(sc) 

val rdd = sc.textFile(data_loc) 

val json_rdd = hqlContext.jsonRDD(rdd) 
json_rdd.registerTempTable("table123") 
println(json_rdd.printSchema()) 
hqlContext.sql("SELECT json_val from table123 lateral view explode_map(json_map(*, 'int,string')) x as json_key, json_val ").foreach(println) 

을 시도하고 무엇을 : 당신이 "2"(키 이름)를 "1"의 이름을 변경하고있는 경우

Exception in thread "main" org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: SELECT json_val from temp_hum_table lateral view explode_map(json_map(*, 'int,string')) x as json_key, json_val 
    at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) 
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) 
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
+0

출력이 매우 유용 할 것 같은 예제가 있습니다. – gobrewers14

+0

출력 테이블의 예 :' "time", "reading1", "reading2"\ n 1421169633384, 130.875969, 227.138275 \ n 1421169646476, 131.240628, 226.810211' – venuktan

답변

4

이 작동 것을

val resultrdd = sqlContext.sql("SELECT x1.time, x1.reading1, x1.reading1, x2.time, x2.reading1, x2.reading2 from table123 ") 
resultrdd.flatMap(row => (Array((row(0),row(1),row(2)), (row(3),row(4),row(5))))) 

이 시간, reading1과 함께 튜플의 당신에게 RDD을 줄 것입니다 :합니다 (RDD JSON 파일 내부 또는) "1 개"와 "X2"를 reading2. 당신이 SchemaRDD 필요한 경우,이처럼 flatMap 변환 내부의 경우 클래스에 매핑 것 :

case class Record(time: Long, reading1: Double, reading2: Double) 
resultrdd.flatMap(row => (Array(Record(row.getLong(0),row.getDouble(1),row.getDouble(2)), 
     Record(row.getLong(3),row.getDouble(4),row.getDouble(5)) ))) 
val schrdd = sqlContext.createSchemaRDD(resultrdd) 

업데이트 : 많은 중첩 된 키의 경우

, 당신이 행을 구문 분석 할 수 다음과 같이하십시오 :

val allrdd = sqlContext.sql("SELECT * from table123") 
allrdd.flatMap(row=>{ 
    var recs = Array[Record](); 
    for(col <- (0 to row.length-1)) { 
     row(col) match { 
      case r:Row => recs = recs :+ Record(r.getLong(2),r.getDouble(0),r.getDouble(1)); 
      case _ => ; 
     } 
    }; 
    recs 
}) 
+0

키는 1,2 ... 240까지 계속됩니다. 그래서 x1.time 등을하면 효과가 없을 수 있습니다. – venuktan

+0

나는 내 대답을 – pzecevic

+0

업데이트했습니다. 여기에 제가'allrdd.registerTempTable ("vals"); sqlContext.sql ("vals LIMIT 10에서 reading1을 선택하십시오.") .collect.foreach (println)'나는 무엇을 놓치고 있습니까? – venuktan