2017-12-07 12 views
0

저는 SparkSQL/Scala를 처음 접했고 몇 가지 간단하고 간단한 작업에 어려움을 겪고 있습니다.Spark 2.2 스칼라 DataFrame 문자열 배열에서 선택하여 오류 잡기

스칼라 문자열 배열에서 일부 동적 SQL을 작성하려고합니다. DataFrame에서 일부 열을 다시 입력하려고 시도하지만 DataFrame에서 열 집합을 볼 수있는 런타임까지 다시 입력해야하는 항목을 정확히 알지 못합니다. 그래서 나는이 일을하려고 해요 :

["a", "cast(b as int) b", "c"] 

내가 그에서 처음으로 큰 쉼표로 구분 된 문자열을 작성해야합니까 :

val cols = df.columns 
val typedCols = cols.map(c => getTypedColumn(c)) 
df.select(...) or df.selectExpr(...) // how to invoke this with vals from my string array?? 

typedCols이 같은 값을 문자열 배열 끝나게됩니다 정렬?

이렇게하면이 select 문을 호출하여 원하는 DataFrame을 원하는 DataFrame으로 변환 할 수 있습니다. 그러나 DataFrame에있는 일부 레코드에는 오류가 있으며 다시 입력 시도가 실패합니다.

입력을 통과 한 모든 좋은 레코드로 DataFrame 결과를 얻은 다음 모든 종류의 오류 레코드를 오류 버킷에 던지려면 어떻게해야합니까? DataFrame select를 시도하기 전에 유효성 검사 패스를 먼저 수행해야합니까?

답변

1

당신은 그냥 사용할 수있는 가변 인수 : I는 입력을 통과하고 모든 좋은 기록으로 DataFrame 결과를 얻을 것 어떻게

val typedCols = Array($"a", $"b" cast "int", $"c") 
df.select(typedCols: _*).show 

:

val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c") 
val typedCols = Array("a", "cast(b as int) b", "c") 
df.selectExpr(typedCols: _*).show 

+---+----+---+ 
| a| b| c| 
+---+----+---+ 
| a| 1| c| 
|foo|null|baz| 
+---+----+---+ 

하지만 개인적으로 나는 열을 선호 그런 다음 모든 오류 레코드를 오류 버킷에 던집니다.

cast의 데이터는 NULL입니다. 어떤 NULL

import org.apache.spark.sql.functions.col 

val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _)) 

은 타의 추종을 불허하는 데이터를 얻을 수있는 경우

val result = df.selectExpr(typedCols: _*) 
val good = result.na.drop() 

가 부도 수표를 찾으려면 : 좋은 기록이 na.drop를 사용 찾으려면 Seq[Column] 할 수 있습니다

있습니다

  • typedCols 경우

    df.where(typedCols.map(_.isNull).reduce(_ || _)) 
    
  • typedColsSeq[String] 있습니다
  • 경우에 당신은 할 수 있습니다

    import org.apache.spark.sql.functions.expr 
    
    df.where(typedCols.map(expr(_).isNull).reduce(_ || _))