2017-02-07 4 views
3

내가 "com.holdenkarau.spark 테스트 기반"와 scalatest의 도움으로 불꽃 스트리밍 응용 프로그램을 테스트하고 있습니다. 내가 자리에 문자열 값으로 'DELIM'을 교체 할 경우 잘 작동,

org.apache.spark.SparkException: Task not serializable 
[info] at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
[info] at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
[info] at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
[info] at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
[info] at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) 
[info] at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) 
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
[info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
[info] at org.apache.spark.rdd.RDD.map(RDD.scala:323) 
[info] ... 
[info] Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper 
[info] Serialization stack: 
[info] - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: [email protected]) 
[info] - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper) 

:

import com.holdenkarau.spark.testing.StreamingSuiteBase 
import org.apache.spark.rdd.RDD 
import org.scalatest.{ BeforeAndAfter, FunSuite } 

class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase { 

    var delim: String = "," 

    before { 
    System.clearProperty("spark.driver.port") 
    } 

    test(“This Fails“) { 

    val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv")) 
    val input = source.getLines.toList 

    val rowRDDOut = Calculator.do(sc.parallelize(input)) //Returns DataFrame 

    val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1)) 

    source.close 
    } 
} 

내가 필드 'DELIM'에 대한 직렬화 예외를 얻을.

val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1)) 

첫 번째 버전과 두 번째 버전의 차이점은 무엇입니까?

미리 감사드립니다.

답변

4

문제는 delim (문자열) 유형이 아니며 delim입니다.

test() 메소드 외부에서 변수를 정의하지 마십시오. test 안에 delm을 정의하면 제대로 작동합니다.

test(“This Fails“) { 
    val delim = "," 
    ... 
} 

이제 왜 그런지 물어볼 수 있습니까? 자, delim을 외부 범위에서 참조 할 때 Scala는 둘러싸는 객체 class Test을 가져 오려고 시도합니다. 이 객체에는 org.scalatest.Assertions$AssertionsHelper에 대한 참조가 포함되어 있습니다 (스택 추적 참조).

+1

우와! 그것을 생각조차 할 수 없었다! 감사! 매력처럼 일했습니다. –