2017-11-24 10 views
-1

scalatest로 내 akka 스트림 응용 프로그램을 테스트 할 때 NullPointerException이 발생하고 이유를 이해하지 못합니다 ... Akka 스트림에서 뭔가 빠져 들었을 것입니다. 내가 스칼라 2.12.4와 SBT 1.0.3 와 scalatest에 대한 코드의 일반적인 구조를 사용testkit 및 scalatest를 사용하는 Akka 스트림 테스트

이 내 응용 프로그램

object CdrToMongoReactiveStream extends App { 

    implicit val system = ActorSystem("cdr-data-generator") 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext=materializer.executionContext 
    import RandomCdrJsonProtocol._ 

    val randomCdrThrottledSource : Source[RandomCdr,NotUsed]= Source 
    .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange))) 
    .throttle(throughput,1.second,1,ThrottleMode.shaping) 
    .named("randomCdrThrottledSource") 

    val cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= Flow[RandomCdr] 
    .map((cdr: RandomCdr) => cdr.toJson.toString()) 
    .named("cdrJsonParseFlow") 

    val mongodbBulkSink : Sink[String,NotUsed] = Flow[String] 
    .map((json: String) => Document.parse(json)) 
    .map((doc: Document) => new InsertOneModel[Document](doc)) 
    .grouped(bulkSize) 
    .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒ 
     Source.fromPublisher(collection.bulkWrite(docs.toList.asJava)) 
    } 
    .to(Sink.ignore) 

    val f = randomCdrThrottledSource.via(cdrJsonParseFlow).runWith(mongodbBulkSink) 
} 

그리고 내 테스트 파일

class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers { 

    import RandomCdrJsonProtocol._ 

    "randomCdrThrottledSource" should { 
    "generate RandomCdr elements only" in { 
     val future = CdrToMongoReactiveStream.randomCdrThrottledSource 
     // line 30 in the error 
     .runWith(Sink.head)(CdrToMongoReactiveStream.materializer) 

     val cdr = Await.result(future,10.second) 
     cdr shouldBe a [RandomCdr] 
    } 
    } 
    "cdrJsonParseFlow" should { 
    "parse RandomCdr to correct json format" in { 
     val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0) 
     val (pub,sub) = TestSource.probe[RandomCdr] 
     .via(CdrToMongoReactiveStream.cdrJsonParseFlow) 
     .toMat(TestSink.probe[String])(Keep.both) 
     .run() 

     sub.request(1) 
     pub.sendNext(randomCdr) 
     sub.expectNext() shouldBe equal(randomCdr.toJson.toString()) 
    } 
    } 
} 

그리고 오류 메시지입니다

java.lang.NullPointerException was thrown. 
java.lang.NullPointerException 
    at CdrToMongoReactiveStreamSpec.$anonfun$new$2(CdrToMongoReactiveStreamSpec.scala:30) 
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) 
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) 
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) 
    at org.scalatest.Transformer.apply(Transformer.scala:22) 
    at org.scalatest.Transformer.apply(Transformer.scala:20) 
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078) 
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196) 
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195) 
    at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881) 
    at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076) 
    at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088) 
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) 
    at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088) 
    at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070) 
    at org.scalatest.WordSpec.runTest(WordSpec.scala:1881) 
    at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147) 
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396) 
    at scala.collection.immutable.List.foreach(List.scala:389) 
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) 
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373) 
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410) 
    at scala.collection.immutable.List.foreach(List.scala:389) 
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) 
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379) 
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) 
    at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147) 
    at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146) 
    at org.scalatest.WordSpec.runTests(WordSpec.scala:1881) 
    at org.scalatest.Suite.run(Suite.scala:1147) 
    at org.scalatest.Suite.run$(Suite.scala:1129) 
    at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881) 
    at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192) 
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521) 
    at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192) 
    at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190) 
    at org.scalatest.WordSpec.run(WordSpec.scala:1881) 
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) 
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340) 
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334) 
    at scala.collection.immutable.List.foreach(List.scala:389) 
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334) 
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031) 
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010) 
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500) 
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010) 
    at org.scalatest.tools.Runner$.run(Runner.scala:850) 
    at org.scalatest.tools.Runner.run(Runner.scala) 
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) 
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) 
+0

아마 하나는 null입니다. 그것들을 인쇄 해 보았거나 IDE 디버깅 도구를 사용하여 어느 것을 알았습니까? 원래 파일에서 30 행에 어떤 행이 있는지는 명확하지 않습니다. 실행 한 후에 일부 행을 제거한 것 같아요. 귀하의 질문에 선을 추가 하시거나, 중요하지 않다면 선없이 시험을 다시 실행하여 유용한 스택 추적을 얻을 수 있습니까? –

+0

코드에 오류가 없습니다. 널 포인터 예외는'CdrToMongoReactiveStreamSpec.scala : 30'을 다루지 만 코드 스 니펫은 단지 25 행입니다. –

+0

죄송합니다. 준비를 위해 코드를 단순화했습니다. 30 행은 runwith (...)입니다. – vgkowski

답변

0

소스, 플로우 및 싱크를 메인 프로그램

object CdrToMongoReactiveStream { 

    def randomCdrThrottledSource(msisdnLength : Int,timeRange : Int, throughput : Int): Source[RandomCdr,NotUsed]= { 
    Source 
     .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange))) 
     .throttle(throughput,1.second,1,ThrottleMode.shaping) 
     .named("randomCdrThrottledSource") 
    } 

    def cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= { 
    import RandomCdrJsonProtocol._ 

    Flow[RandomCdr] 
     .map((cdr: RandomCdr) => cdr.toJson.toString()) 
     .named("cdrJsonParseFlow") 
    } 

    def mongodbBulkSink(collection : MongoCollection[Document], bulkSize : Int) : Sink[String,NotUsed] = { 

    Flow[String] 
     .map((json: String) => Document.parse(json)) 
     .map((doc: Document) => new InsertOneModel[Document](doc)) 
     .grouped(bulkSize) 
     .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒ 
     Source.fromPublisher(collection.bulkWrite(docs.toList.asJava)) 
     } 
     .to(Sink.ignore) 
    } 

    def main(args: Array[String]): Unit = { 
    val f = randomCdrThrottledSource(msisdnLength,timeRange,throughput) 
     .via(cdrJsonParseFlow).runWith(mongodbBulkSink(collection,bulkSize)) 

    logger.info("Generated random data") 
    } 
} 

및 라인 (30)에 테스트 파일의 값의

class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers { 

    import CdrToMongoReactiveStream._ 
    import RandomCdrJsonProtocol._ 

    implicit val system = ActorSystem("cdr-data-generator") 
    implicit val materializer = ActorMaterializer() 

    val collection = new Fongo("mongo test server").getDB("cdrDB").getCollection("cdr") 
    val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0) 

    "randomCdrThrottledSource" should { 
    "generate RandomCdr elements only" in { 
     val future = CdrToMongoReactiveStream.randomCdrThrottledSource(8,86400000,1) 
     .runWith(Sink.head) 

     val cdr = Await.result(future,5.second) 
     cdr shouldBe a [RandomCdr] 
    } 
    } 
}