2016-08-09 10 views
0

Spark Context에 탭으로 구분 된 파일을로드해야합니다. 그러나 일부 필드는 값이 누락되어 있으며 이러한 행을 필터링해야합니다. 다음 코드를 사용하고 있습니다. 그러나 필드가 완전히 누락 된 경우 (예 : 행의 탭이 하나 더 적음)이 코드는 예외를 발생시킵니다. 이것을 달성하는 더 좋은 방법은 무엇입니까?Spark Context에 텍스트 파일을로드하는 동안 누락 된 필드가있는 행 건너 뛰기

val RDD = sc.textFile("file.txt").map(_.split("\t")) 
.filter(_(0).nonEmpty) 
.filter(_(1).nonEmpty) 
.filter(_(2).nonEmpty) 
.filter(_(3).nonEmpty) 
.filter(_(4).nonEmpty) 
.filter(_(5).nonEmpty) 
+0

나는 아마'flatMap'과'match'를 사용하십시오. 그렇게하면 동일한 단계에서 선을 처리 할 수도 있습니다. – Alec

답변

0

나는이 큰 데이터 세트 멋지게 일을 발견 :

val allRecords: RDD[Either[(String, String, String, String), Array[String]]] = sc.textFile(s"file.txt") 
    .map(x=>x.split("\t")) 
    .map { 
    case Array(name, address, phone, country) => Left(name, address, phone, country) 
    case badArray => Right(badArray) 
    } 

val goodRecords = allRecords.collect{ case Left(r) => r } 
0

우선은,이 ​​사항을 고려하십시오

scala> "1\t2\t\t4\t".split("\t") 
res0: Array[String] = Array(1, 2, "", 4) 

scala> "1\t2\t\t4\t".split("\t", -1) 
res1: Array[String] = Array(1, 2, "", 4, "") 

두 번째 인수로 음의 정수를 전달 후행 빈 요소가 손실되지 않습니다 보장합니다.

는 시도 (그것은 내 솔루션의 정확성을 위해 필수적이다) 다음

val RDD = sc.textFile("file.txt").map(_.split("\t", -1)) 
    .filter(array => array.forall(elem => elem.nonEmpty)) 
0

당신은 dataframe로 파일을 읽고 DataFrameNaFunctions

val df = sqlContext.read.format("com.databricks.spark.csv").option("delimiter", "\t").load("file.txt") 
val cleanDF = df.na.drop() 

Here이 스파크에 대한 링크입니다 사용할 수 있습니다 경우에 따라 CSV 라이브러리.