2017-11-14 7 views
-1

데이터를 읽고 튜플에서 두 번째 요소를 선택하는 코드를 작성했습니다. 두 번째 요소는 JSON 일 수 있습니다. 내가 열과 행으로 marketplaceId, 고객 ID를 등과 같은 JSON 키가 데이터 프레임을 만들려면 지금Json 키를 Spark의 열로 변환하십시오.

{"data": {"marketplaceId":7,"customerId":123,"eventTime":1471206800000,"asin":"4567","type":"OWN","region":"NA"},"uploadedDate":1471338703958} 

: 아래

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.conf.Configuration; 
import com.amazon.traffic.emailautomation.cafe.purchasefilter.util.CodecAwareManifestFileSystem; 
import com.amazon.traffic.emailautomation.cafe.purchasefilter.util.CodecAwareManifestInputFormat; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import amazon.emr.utils.manifest.input.ManifestItemFileSystem; 
import amazon.emr.utils.manifest.input.ManifestInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat ; 
import scala.Tuple2; 

val configuration = new Configuration(sc.hadoopConfiguration); 
ManifestItemFileSystem.setImplementation(configuration); 
ManifestInputFormat.setInputFormatImpl(configuration, classOf[TextInputFormat]); 
val linesRdd1 = sc.newAPIHadoopFile("location", classOf[ManifestInputFormat[LongWritable,Text]], classOf[LongWritable], classOf[Text], configuration).map(tuple2 => tuple2._2.toString()); 

은 예입니다 코드는 JSON을 얻기 위해 그 가치를 지니고있다. 나는 이걸 어떻게 진행할 지 모르겠다. 누군가가 나를 얻을 수있는 포인터를 도울 수 있습니까?

답변

0

이 링크 https://coderwall.com/p/o--apg/easy-json-un-marshalling-in-scala-with-jackson

를 사용하여 정렬 화/비 정렬 화 JSON에 대한 스칼라 객체를 만든 다음 스칼라의 경우 클래스를 사용하여 JSON 데이터를 읽기 위해 해당 개체를 사용할 수 있습니다

import org.apache.spark.{SparkConf, SparkContext} 

object stackover { 
    case class Data(
        marketplaceId: Double, 
        customerId: Double, 
        eventTime: Double, 
        asin: String, 
        `type`: String, 
        region: String 
       ) 
    case class R00tJsonObject(
          data: Data, 
          uploadedDate: Double 
          ) 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf(true) 
    conf.setAppName("example"); 
    conf.setMaster("local[*]") 

    val sc = new SparkContext(conf) 
    val data = sc.textFile("test1.json") 
    val parsed = data.map(row => JsonUtil.readValue[R00tJsonObject](row)) 

    parsed.map(rec => (rec.data, rec.uploadedDate, rec.data.customerId, 
rec.data.marketplaceId)).collect.foreach(println) 
} 
} 

출력 :

(Data(7.0,123.0,1.4712068E12,4567,OWN,NA),1.471338703958E12,123.0,7.0)