2017-10-10 10 views
0

나는 akka 스트리밍을 처음 듣습니다. 아래 예제를 github에서 실행하고 있습니다. 그러나 "Helloer"배우에게 보내는 메시지는 출력 콘솔에서 수신 및 표시되지 않습니다.스파크 스트리밍이있는 akka 스트림 : 메시지가 배우에게 전달되지 않습니다. 죽은 편지 받기

StreamingApp.scala

import _root_.akka.actor.{ Actor, Props } 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.akka.{ ActorReceiver, AkkaUtils } 

class Helloer extends ActorReceiver { 
    override def preStart() = { 
    println("") 
    println("=== Helloer is starting up ===") 
    println(s"=== path=${context.self.path} ===") 
    println("") 
    } 
    def receive = { 
    // store() method allows us to store the message so Spark Streaming knows about it 
    // This is the integration point (from Akka's side) between Spark Streaming and Akka 
    case s => store(s) 
    } 
} 

object StreamingApp { 
    def main(args: Array[String]) { 
    // Configuration for a Spark application. 
    // Used to set various Spark parameters as key-value pairs. 
    val driverPort = 7777 
    val driverHost = "localhost" 
    val conf = new SparkConf() 
     .setMaster("local[*]") // run locally with as many threads as CPUs 
     .setAppName("Spark Streaming with Scala and Akka") // name in web UI 
     .set("spark.logConf", "true") 
     .set("spark.driver.port", driverPort.toString) 
     .set("spark.driver.host", driverHost) 
    val ssc = new StreamingContext(conf, Seconds(10)) 

    val actorName = "helloer" 

    // This is the integration point (from Spark's side) between Spark Streaming and Akka system 
    // It's expected that the actor we're now instantiating will `store` messages (to close the integration loop) 
    val actorStream = AkkaUtils.createStream[String](ssc, Props[Helloer](), actorName) 

    // describe the computation on the input stream as a series of higher-level transformations 
    actorStream.reduce(_ + " " + _).print() 

    // Custom receiver 
    import pl.japila.spark.streaming.CustomReceiverInputDStream 
    import org.apache.spark.storage.StorageLevel 
    import org.apache.spark.streaming.dstream.ReceiverInputDStream 
    val input: ReceiverInputDStream[String] = ssc.receiverStream[String](CustomReceiverInputDStream(StorageLevel.NONE)) 
    input.print() 

    // Data Ingestion from Kafka 
    import org.apache.spark.streaming.kafka._ 

    // start the streaming context so the data can be processed 
    // and the actor gets started 
    ssc.start() 

    // FIXME wish I knew a better way to handle the asynchrony 
    java.util.concurrent.TimeUnit.SECONDS.sleep(3) 

    import _root_.akka.actor.ActorSystem 
    val actorSystem = ActorSystem("SparkStreamingAkka") 

    val url = s"akka.tcp://[email protected]$driverHost:$driverPort/user/Supervisor0/$actorName" 
    val helloer = actorSystem.actorSelection(url) 
    helloer ! "Hello" 
    helloer ! "from" 
    helloer ! "Spark Streaming" 
    helloer ! "with" 
    helloer ! "Scala" 
    helloer ! "and" 
    helloer ! "Akka" 

    import java.util.concurrent.TimeUnit.MINUTES 
    ssc.awaitTerminationOrTimeout(timeout = MINUTES.toMillis(1)) 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    } 
} 

customeReceiverInputDstream 구현을 사용하는 프로그램. 아래는 커스텀 리시버입니다.

customeReceiverInputDstream.scala

package pl.japila.spark.streaming 

import org.apache.spark.streaming.receiver.Receiver 
import org.apache.spark.storage.StorageLevel 

    case class CustomReceiverInputDStream[T](override val storageLevel: StorageLevel) extends Receiver[T](storageLevel) { 
     def onStart() { 
     println("\nHello from CustomReceiver.START\n") 
     } 

     def onStop() { 
     println("\nHello from CustomReceiver.STOP\n") 
     } 
    } 

아래 내가 무엇입니까 메시지 deadletter 출력입니다.

    . 
        . 
        . 

Hello from CustomReceiver.START 

        . 
        . 
        . 

17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805400 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [10/10/2017 08:00:05.475] [SparkStreamingAkka-akka.actor.default-dispatcher-6] [akka://SparkStreamingAkka/deadLetters] Message [java.lang.String] from Actor[akka://SparkStreamingAkka/deadLetters] to Actor[akka://SparkStreamingAkka/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805600 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805600 
[INFO] [10/10/2017 08:00:05.693] [Executor task launch worker-0] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552] 
[INFO] [10/10/2017 08:00:05.696] [Executor task launch worker-0] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552] 
17/10/10 08:00:05 INFO ActorReceiverSupervisor: Supervision tree for receivers initialized at:akka://streaming-actor-system-0/user/Supervisor0 
17/10/10 08:00:05 INFO ReceiverSupervisorImpl: Called receiver 0 onStart 
17/10/10 08:00:05 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped 
17/10/10 08:00:05 INFO ActorReceiverSupervisor: Started receiver worker at:akka://streaming-actor-system-0/user/Supervisor0/helloer 

=== Helloer is starting up === 
=== path=akka://streaming-actor-system-0/user/Supervisor0/helloer === 

17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805800 
17/10/10 08:00:05 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636805800 
17/10/10 08:00:06 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1507636806000 
           . 
           . 
           . 
+0

왜 대신 "actorSelection"의 "actorOf"를 사용하지는 작동합니다. Akka 문서에 따르면 "actorSelection은 메시지가 전달 될 때 기존 액터를 조회합니다. 즉, 액터를 만들지 않거나 선택이 생성 될 때 액터의 존재를 확인합니다." – EmiCareOfCell44

+0

@ EmiCareOfCell44가 actorOf를 사용하여 시도했습니다. 이번에는 데드 레터 메시지가 없습니다. 그러나 위의 코드에 표시된대로 보낸 메시지를 표시하지 않았습니다. – Mahesh

+0

case 절 안에 println이 없습니다. 무엇을 인쇄 할 것으로 예상합니까 ?? – EmiCareOfCell44

답변

0

확인되었습니다. 여기에서 문제는 소스로 행동하기 위해 생성 된 액터, "hello"는 다른 ActorSystem에서 시작되며이 코드는 다른 ActorSystem에서 akka.remote를 통해 "SparkStreaminAkka"라는 이름의 검색을 시도하므로 완전한 akka입니다. tcp url이 사용됩니다. 이 코드에서는 작동하지 않으며 추가 조사가 수행됩니다 ... 그러나이 예제에서는 다른 ActorSystem을 반드시 사용해야하는 것은 아닙니다. 주위에 작업이 될 수있다 :

import _root_.akka.actor.{Actor, Props} 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} 

class Helloer extends ActorReceiver { 
    override def preStart() = { 
    println("") 
    println("=== Helloer is starting up ===") 
    println(s"=== path=${context.self.path} ===") 
    println("") 
    } 
    def receive = { 
    // store() method allows us to store the message so Spark Streaming knows about it 
    // This is the integration point (from Akka's side) between Spark Streaming and Akka 
    case s => store(s) 
    } 
} 


// Create a common actor system 
object CreateActorSystem { 
    lazy val as = _root_.akka.actor.ActorSystem("ActorSystemSpark") 
} 

object StreamingApp { 
    import StreamingApp._ 

    def main(args: Array[String]) { 
    // Configuration for a Spark application. 
    // Used to set various Spark parameters as key-value pairs. 
    val driverPort = 7777 
    val driverHost = "localhost" 
    val conf = new SparkConf() 
     .setMaster("local[*]") // run locally with as many threads as CPUs 
     .setAppName("Spark Streaming with Scala and Akka") // name in web UI 
     .set("spark.logConf", "true") 
     .set("spark.driver.port", driverPort.toString) 
     .set("spark.driver.host", driverHost) 
    val ssc = new StreamingContext(conf, Seconds(10)) 

    val actorName = "helloer" 

    // This is the integration point (from Spark's side) between Spark Streaming and Akka system 
    // It's expected that the actor we're now instantiating will `store` messages (to close the integration loop) 

    // Pass actorsystem as parameter 
    val actorStream = AkkaUtils.createStream[String](ssc, Props[Helloer](), actorName, actorSystemCreator =() => CreateActorSystem.as) 

    // describe the computation on the input stream as a series of higher-level transformations 
    actorStream.reduce(_ + " " + _).print() 

    // Custom receiver 
    import pl.japila.spark.streaming.CustomReceiverInputDStream 
    import org.apache.spark.storage.StorageLevel 
    import org.apache.spark.streaming.dstream.ReceiverInputDStream 
    val input: ReceiverInputDStream[String] = ssc.receiverStream[String](CustomReceiverInputDStream(StorageLevel.NONE)) 
    input.print() 

    // Data Ingestion from Kafka 
    //import org.apache.spark.streaming.kafka._ 

    // start the streaming context so the data can be processed 
    // and the actor gets started 
    ssc.start() 

    // FIXME wish I knew a better way to handle the asynchrony 
    java.util.concurrent.TimeUnit.SECONDS.sleep(3) 

    import _root_.akka.actor.ActorSystem 

    val actorSystem = CreateActorSystem.as 

    //Get the actor from the path. There is no nedd o akka.remote 
    val helloer = actorSystem.actorSelection("/user/Supervisor0/helloer") 

    helloer ! "Hello" 
    helloer ! "from" 
    helloer ! "Spark Streaming" 
    helloer ! "with" 
    helloer ! "Scala" 
    helloer ! "and" 
    helloer ! "Akka" 

    import java.util.concurrent.TimeUnit.MINUTES 
    ssc.awaitTerminationOrTimeout(timeout = MINUTES.toMillis(1)) 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    } 
} 

+0

고마워요! 귀하의 수정 작업 중입니다. 나는 이것에 대해 더 많은 지식을 얻을 것입니다 akka.tcp – Mahesh

+0

'actorOf '를 사용하여 원격 메시징을 작동시킬 수 있습니다. 하지만'actorSelection'https : //stackoverflow.com/questions/46724732/actorsystem-actorselection-is-not-working-for-remote-actors-where-actorof-is-wor를 사용할 때 여전히 문제가 있습니다. – Mahesh