1
이 스칼라 코드를 작성하여 Spark DataFrame의 각 행에 대해 작업을 수행했습니다. 기본적으로 이들은 내가 수행하는 단계입니다루프의 출력을 데이터 프레임에 작성하십시오.
1. I convert the DataFrame into an array
2. Iterate through the array and perform calculations and get the output in an array
3. convert the output of the array to a dataframe and then make a Hive table.
2 단계에서 제가 백만 기록을 실행할 때 문제가 있습니다. 어쨌든 성능을 향상시킬 수 있습니까? 참고 자료 필자는 AFAIK 스파크 데이터 프레임을 반복 할 수 없기 때문에 데이터 프레임을 배열로만 변환합니다.
def getRows (ca : org.apache.spark.sql.DataFrame) =
{
val allca = List()
for (a <- ca.collect()) yield
{
val newAddress = a.getString(1)
val output = newAddress :: getRecursiveList(newAddress).reverse
val dataset =
CA (account.getInt(0),
account.getString(1),
account.getString(2),
output.toString)
dataset :: allca
}
}
val myArray = getRows(customerAccounts)
val OutputDataFrame = sc.parallelize(myArray.flatMap(x => x)).toDF
OutputDataFrame.show()
val resultsRDD = OutputDataFrame.registerTempTable("history")
spark.sql(""" insert into user_tech.history select * from history """).collect.foreach(println)
에 대해 다음 링크를 따라하세요? –
좋은 지적은 할 수 있습니다. 이 기능이 가능한지 알고 싶었습니다. – Srinivas
또한 inbuilt 함수를 참조하십시오. https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html 사용할 수있는 경우 그들. 그들은 udf보다 나은 성능을 제공해야합니다. 그러나 논리가 함수 중 하나를 통해 수행 될 수 없다면 udf 함수로 가야합니다. –