2017-11-02 4 views
1

이 스칼라 코드를 작성하여 Spark DataFrame의 각 행에 대해 작업을 수행했습니다. 기본적으로 이들은 내가 수행하는 단계입니다루프의 출력을 데이터 프레임에 작성하십시오.

1. I convert the DataFrame into an array 
2. Iterate through the array and perform calculations and get the output in an array 
3. convert the output of the array to a dataframe and then make a Hive table. 

2 단계에서 제가 백만 기록을 실행할 때 문제가 있습니다. 어쨌든 성능을 향상시킬 수 있습니까? 참고 자료 필자는 AFAIK 스파크 데이터 프레임을 반복 할 수 없기 때문에 데이터 프레임을 배열로만 변환합니다.

def getRows (ca : org.apache.spark.sql.DataFrame) = 
{ 
    val allca = List() 
    for (a <- ca.collect()) yield 
    { 
    val newAddress = a.getString(1) 
    val output = newAddress :: getRecursiveList(newAddress).reverse 


    val dataset = 
CA (account.getInt(0), 
      account.getString(1), 
      account.getString(2), 
      output.toString) 

    dataset :: allca 
    } 
} 

val myArray = getRows(customerAccounts) 

val OutputDataFrame = sc.parallelize(myArray.flatMap(x => x)).toDF 

OutputDataFrame.show() 


val resultsRDD = OutputDataFrame.registerTempTable("history") 

spark.sql(""" insert into user_tech.history select * from history """).collect.foreach(println) 
+0

에 대해 다음 링크를 따라하세요? –

+0

좋은 지적은 할 수 있습니다. 이 기능이 가능한지 알고 싶었습니다. – Srinivas

+0

또한 inbuilt 함수를 참조하십시오. https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html 사용할 수있는 경우 그들. 그들은 udf보다 나은 성능을 제공해야합니다. 그러나 논리가 함수 중 하나를 통해 수행 될 수 없다면 udf 함수로 가야합니다. –

답변

0

몇 가지 기본을 이해하십시오

  1. 스파크 스칼라/자바 API는 매우 높은 수준의 관점을 제공하고 데이터 구조의 분산 성격의 아이디어를 제공하지 않습니다.
  2. 데이터 프레임 반복에는 두 가지 옵션이 있습니다. 하나는 분산 된 방식으로 반복하고 하나의 머신에서 모든 데이터를 수집 한 다음 반복하는 것입니다.
  3. ca.collect()는 모든 노드의 데이터 프레임에서 데이터를 수집하고 확장 가능한 솔루션이 아닌 처리 할 드라이버로 데이터를 가져옵니다.

    은 단지 UDF를 쓸 수 없습니다 이해

    1. http://bytepadding.com/big-data/spark/spark-code-analysis/
    2. http://bytepadding.com/big-data/spark/understanding-spark-through-map-reduce/