음 ... 예를 들어 추가해야 할 것 같습니다. 이해해야 할 점 중 하나는 BackPressure가 GraphStages의 AsyncBoundries에 의해 처리된다는 것입니다. 그것은 정말로 다른 곳에 존재하는 구성 요소와는 아무런 관련이 없습니다. 또한 ... 그것은 새로운 원격 수송 이외의 동맥에 의존하지 않습니다. 여기
는
먼저 응용 프로그램,
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
class MyActor extends Actor with ActorLogging {
override def receive: Receive = {
case msg @ _ => {
log.info(msg.toString)
sender() ! msg
}
}
}
object MyApplication extends App {
val config = ConfigFactory.parseString(
"""
|akka{
| actor {
| provider = remote
| }
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| untrusted-mode = off
| netty.tcp {
| hostname="127.0.0.1"
| port=18000
| }
| }
|}
""".stripMargin
)
val actorSystem = ActorSystem("my-actor-system", config)
var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")
}
그리고 두 번째 응용 프로그램 ... 실제로 "실행"첫 번째 응용 프로그램에 배우를 사용하여 스트림 아마 가장 간단한 크로스 JVM 스트림의 예입니다 .
import akka.actor.{ActorPath, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.pattern.ask
import com.typesafe.config.ConfigFactory
import scala.language.postfixOps
import scala.concurrent.duration._
object YourApplication extends App {
val config = ConfigFactory.parseString(
"""
|akka{
| actor {
| provider = remote
| }
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| untrusted-mode = off
| netty.tcp {
| hostname="127.0.0.1"
| port=19000
| }
| }
|}
""".stripMargin
)
val actorSystem = ActorSystem("your-actor-system", config)
import actorSystem.dispatcher
val logger = actorSystem.log
implicit val implicitActorSystem = actorSystem
implicit val actorMaterializer = ActorMaterializer()
val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor")
val myActorSelection = actorSystem.actorSelection(myActorPath)
val source = Source(1 to 10)
// here this "mapAsync" wraps the given T => Future[T] function in a GraphStage
val myRemoteComponent = Flow[Int].mapAsync(2)(i => {
myActorSelection.resolveOne(1 seconds).flatMap(myActorRef =>
(myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int])
)
})
val sink = Sink.foreach[Int](i => logger.info(i.toString))
val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right)
val streamRun = stream.run()
}
나는 당신이 뭔가를 잘못 이해하고 있다고 생각합니다. 그래서 ... 당신은 어떻게 JVM 스트림을 가질 수 있습니까? 자 ... 실제로 다른 jvm에있는 구성 요소를 가지고 있습니다. 이제이 특별한 경우의 구성 요소가 액터가 될 것임을 이해해야합니다. 따라서 ... '원격 액터'와 함께 FlowShape/Sink/Source를 생성하면 동맥이 메시지를 처리합니다. –
블로그 게시물에 따르면, 두 명의 배우가 서로 의사 소통 할 때 동맥은 허리 통증을 유지합니다. 제 질문은 오히려 예를 들어 스트림에서 원격 액터를 통합하기 위해'.mapAsync'를 사용하는 것은 같은 결과를 낳습니다 : 다른 머신에서 무언가를 처리하는 스트림을 가짐. 더 일반적으로 묻는 질문 : JVM 경계를 넘어서는 스트림을 구현하는 방법은 무엇입니까? – Toaditoad