2016-11-18 2 views
2

S3 폴더로 스트리밍 된 xml 파일을 처리해야한다는 요구 사항이 있습니다. 현재, 나는 그것을 다음과 같이 구현했다.Spark Streaming XML 파일

첫째, 모든 파일이 새로운 HDFS 디렉토리

data.coalesce(1).saveAsTextFile(sdir); 
에 문자열을 쓰기
if (data.count() !=0) 

을 읽은 경우 스파크의 FILESTREAM 각 RDD를 들어

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

을 확인하여 파일 읽기

위의 HDFS에서 읽은 데이터 프레임 만들기 di

val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir) 

사제관

는 Dataframe에 일부 처리를 수행하고 JSON

loaddata.write.mode("append").json("s3://mybucket/somefolder") 

은 어떻게 든, 나는 위의 접근 방식은 매우 비효율적이고 솔직히 꽤 학교 소년 같은 느낌 저장합니다. 더 좋은 솔루션이 있습니까? 어떤 도움이라도 대단히 감사하겠습니다.

추가 질문 : 데이터 프레임의 필드 (열 제외)를 조작하는 방법은 무엇입니까? 나는 vey complex nested xml을 가지고 있는데 위에서 설명한 메서드를 사용할 때 Dataframe에 9 개의 열과 50 개의 내부 구조체 배열이 있습니다. 특정 필드 이름을 줄여야하는 경우를 제외하고는 괜찮습니다. 같은 구조를 다시 구성해야하므로 데이터 프레임을 폭발시키지 않고도이를 달성 할 수있는 방법이 있습니까?

답변

1

당신이 스파크 2.0을 사용하는 경우가 구조화 된 스트리밍 작업을 할 수 있습니다 :

val inputDF = spark.readStream.format("com.databricks.spark.xml") 
    .option("rowTag", "Trans") 
    .load(path) 
+0

덕분에 많이. 내 타겟 env는 Spark 2.0.1과 함께 EMR 스택입니다. EMR 상자에 당신의 제안을 시도 할 것입니다. – Vamsi

+0

pls vote-up/accept 당신이 위에서 언급 한 해결책을 가지고 있다면 괜찮습니다. –