3

Akka 2.4.16부터 리 액티브 스트림의 "원격"구현은 없다는 것을 알고 있습니다. 이 사양은 단일 JVM에서 실행되는 스트림에 중점을 둡니다.여러 개의 JVM이 포함 된 Akka 스트림에서 백 프레셔를 유지하는 방법

그러나 배후 압력을 유지하면서 일부 처리를 위해 다른 JVM을 사용하는 유스 케이스를 고려하십시오. 아이디어는 스트림을 실행하는 사용자 인터페이스를 제공하는 기본 응용 프로그램을 갖는 것입니다. 예를 들어,이 스트림에는 다른 컴퓨터에서 실행해야하는 무거운 계산을 수행하는 단계가 있습니다. 일정에 동맥으로 단순화 Akka의 HTTP (Stackoverflow)

  • 를 사용하여 TCP를 통해 스트림을 연결

    다른 어떤 대안이 있습니까? 위의 중요한 단점이 있습니까? 고려해야 할 특별한 특징은 무엇입니까?

    업데이트 :이 질문은 단일 유스 케이스에만 국한되지 않습니다. 일반적으로 분산 환경에서 스트림을 사용하여 작업하는 모든 가능한 방법에 관심이 있습니다. 이는 예를 들어 액터를 .mapAsync으로 통합하는 하나의 스트림 만 포함 할 수 있습니다. Akka HTTP를 통해 통신하는 두 대의 컴퓨터에 두 개의 개별 스트림이있을 수 있습니다. 유일한 요구 사항은 모든 구성 요소간에 역 압력을 강요해야한다는 것입니다.

  • +1

    나는 당신이 뭔가를 잘못 이해하고 있다고 생각합니다. 그래서 ... 당신은 어떻게 JVM 스트림을 가질 수 있습니까? 자 ... 실제로 다른 jvm에있는 구성 요소를 가지고 있습니다. 이제이 특별한 경우의 구성 요소가 액터가 될 것임을 이해해야합니다. 따라서 ... '원격 액터'와 함께 FlowShape/Sink/Source를 생성하면 동맥이 메시지를 처리합니다. –

    +0

    블로그 게시물에 따르면, 두 명의 배우가 서로 의사 소통 할 때 동맥은 허리 통증을 유지합니다. 제 질문은 오히려 예를 들어 스트림에서 원격 액터를 통합하기 위해'.mapAsync'를 사용하는 것은 같은 결과를 낳습니다 : 다른 머신에서 무언가를 처리하는 스트림을 가짐. 더 일반적으로 묻는 질문 : JVM 경계를 넘어서는 스트림을 구현하는 방법은 무엇입니까? – Toaditoad

    답변

    1

    음 ... 예를 들어 추가해야 할 것 같습니다. 이해해야 할 점 중 하나는 BackPressure가 GraphStages의 AsyncBoundries에 의해 처리된다는 것입니다. 그것은 정말로 다른 곳에 존재하는 구성 요소와는 아무런 관련이 없습니다. 또한 ... 그것은 새로운 원격 수송 이외의 동맥에 의존하지 않습니다. 여기

    먼저 응용 프로그램,

    import akka.actor.{Actor, ActorLogging, ActorSystem, Props} 
    import akka.actor.Actor.Receive 
    import com.typesafe.config.{Config, ConfigFactory} 
    
    class MyActor extends Actor with ActorLogging { 
        override def receive: Receive = { 
        case msg @ _ => { 
         log.info(msg.toString) 
         sender() ! msg 
        } 
        } 
    } 
    
    object MyApplication extends App { 
    
        val config = ConfigFactory.parseString(
        """ 
         |akka{ 
         | actor { 
         | provider = remote 
         | } 
         | remote { 
         | enabled-transports = ["akka.remote.netty.tcp"] 
         | untrusted-mode = off 
         | netty.tcp { 
         |  hostname="127.0.0.1" 
         |  port=18000 
         | } 
         | } 
         |} 
        """.stripMargin 
    ) 
    
        val actorSystem = ActorSystem("my-actor-system", config) 
    
        var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor") 
    
    } 
    

    그리고 두 번째 응용 프로그램 ... 실제로 "실행"첫 번째 응용 프로그램에 배우를 사용하여 스트림 아마 가장 간단한 크로스 JVM 스트림의 예입니다 .

    import akka.actor.{ActorPath, ActorSystem} 
    import akka.stream.ActorMaterializer 
    import akka.stream.scaladsl.{Flow, Keep, Sink, Source} 
    import akka.pattern.ask 
    import com.typesafe.config.ConfigFactory 
    
    import scala.language.postfixOps 
    import scala.concurrent.duration._ 
    
    object YourApplication extends App { 
    
        val config = ConfigFactory.parseString(
        """ 
         |akka{ 
         | actor { 
         | provider = remote 
         | } 
         | remote { 
         | enabled-transports = ["akka.remote.netty.tcp"] 
         | untrusted-mode = off 
         | netty.tcp { 
         |  hostname="127.0.0.1" 
         |  port=19000 
         | } 
         | } 
         |} 
        """.stripMargin 
    ) 
    
        val actorSystem = ActorSystem("your-actor-system", config) 
    
        import actorSystem.dispatcher 
    
        val logger = actorSystem.log 
    
        implicit val implicitActorSystem = actorSystem 
        implicit val actorMaterializer = ActorMaterializer() 
    
        val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor") 
    
        val myActorSelection = actorSystem.actorSelection(myActorPath) 
    
        val source = Source(1 to 10) 
    
        // here this "mapAsync" wraps the given T => Future[T] function in a GraphStage 
        val myRemoteComponent = Flow[Int].mapAsync(2)(i => { 
        myActorSelection.resolveOne(1 seconds).flatMap(myActorRef => 
         (myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int]) 
        ) 
        }) 
    
        val sink = Sink.foreach[Int](i => logger.info(i.toString)) 
    
        val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right) 
    
        val streamRun = stream.run() 
    
    } 
    
    +0

    이 예를 들어 주셔서 감사합니다. 내 질문에 말했듯이, 나는'.mapAsync'로 무대를 사용하는 접근법에 익숙합니다. 그러나 나는 그것에 대한 다른 대안이 궁금합니다. 예를 들어, 여러분의 예제를 Akka HTTP에 의해 TCP를 통해 연결되어있는 스트림으로 변환 할 수 있습니다. 내가 명확하게하기 위해 내 질문을 업데이 트하고 있습니다 ... – Toaditoad

    +0

    @Toaditoad 다른 질문을하지 마십시오 제발 - 당신은 스카라와 Akka에 새로운가요?나는 TCP + Akka + Http + Akka Streams + Akka HTTP의 결합 된 이해에 아무런 의미가 없으므로 "Akka HTTP로 TCP를 통해 연결된 스트림"이라는 의미를 솔직히 이해하지 못했습니다. –

    +0

    예, 저는 그 세상에 상당히 새롭습니다. 배우와 스트림으로 비디오 프레임을 처리하기위한 가능한 디자인을 조사하고 있습니다. 따라서 저는 단 하나의 해결책을 찾고있는 것이 아니라 몇 가지 대안을 시도하고자합니다. 아직 TCP/Akka HTTP 아이디어로 시작하지는 않았지만 Konrad Malawski의 대답 (http://stackoverflow.com/a/30693174/4169741)을 보았습니까? 그는 Lightbend와 함께 Akka 지도자 중 한 명이며 그의 대답은 나에게 꽤 분명하게 들립니다. – Toaditoad