2017-10-09 5 views
0

스파크 작업 서버 API (2.2.0 용)를 사용하여 응용 프로그램을 빌드하려고합니다. 하지만 sparkSession과 함께 namedObject를 지원하지 않는다는 것을 알게되었습니다. 같은 내 외모 :spark 작업 서버에서 namedObject가있는 sparkSession을 지원하지 않는 이유는 무엇입니까?

import com.typesafe.config.Config 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.storage.StorageLevel 
import org.scalactic._ 
import spark.jobserver.{NamedDataFrame, NamedObjectSupport, SparkSessionJob} 
import spark.jobserver.api.{JobEnvironment, SingleProblem, ValidationProblem} 

import scala.util.Try 

object word1 extends SparkSessionJob with NamedObjectSupport { 
    type JobData = Seq[String] 
    type JobOutput = String 

def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = 
{ 
    val df = sparkSession.sparkContext.parallelize(data) 
    val ndf = NamedDataFrame(df, true, StorageLevel.MEMORY_ONLY) 
    this.namedObjects.update("df1", ndf) 
    this.namedObjects.getNames().toString 
} 


def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config): 
    JobData Or Every[ValidationProblem] = { 
Try(config.getString("input.string").split(" ").toSeq) 
    .map(words => Good(words)) 
    .getOrElse(Bad(One(SingleProblem("No input.string param")))) 
    } 

} 

하지만 라인 this.namedObjects.update에서() 오류가 있습니다. 나는 그들이 namedObject를 지원하지 않는다고 생각한다. 동일한 코드가 SparkJob로 컴파일하는 동안 :

object word1 extends SparkJob with NamedObjectSupport 

은 sparksession와 namedObjects의 지원이 있습니까? 그렇다면 데이터 프레임/데이터 세트를 유지하기 위해 해결해야 할 것은 무엇입니까?

답변

0

나는 그것을 알아 냈다. 그것은 나의 편에서 어리석은 실수이었다. https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server-api/src/main/scala/spark/jobserver/NamedObjectSupport.scala#L138에서. 그것이라고 :

// NamedObjectSupport는 더 이상 api.SparkJobBase의 JobEnvironment 때문에 필요하지 않습니다. 또한 // 호환을 위해 이전 spark.jobserver.SparkJobBase에 자동으로 가져 왔습니다.

import com.typesafe.config.Config 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.storage.StorageLevel 
import org.scalactic._ 
import spark.jobserver.{NamedDataFrame, NamedObjectSupport, SparkSessionJob} 
import spark.jobserver.api.{JobEnvironment, SingleProblem, ValidationProblem} 

import scala.util.Try 

object word1 extends SparkSessionJob with NamedObjectSupport { 
    type JobData = Seq[String] 
    type JobOutput = String 

def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = 
    { 
    val df = sparkSession.sparkContext.parallelize(data) 
    val ndf = NamedDataFrame(df, true, StorageLevel.MEMORY_ONLY) 
    runtime.namedObjects.update("df1", ndf) 
    runtime.namedObjects.getNames().toString 
    } 


def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config): 
    JobData Or Every[ValidationProblem] = { 
Try(config.getString("input.string").split(" ").toSeq) 
    .map(words => Good(words)) 
    .getOrElse(Bad(One(SingleProblem("No input.string param")))) 
    } 

} 
:

@Deprecated 
trait NamedObjectSupport 

따라서 우리는이 코드를 수정해야이 기능에 액세스 할 수