2017-12-06 21 views

답변

1

한 가지 해결책은 필요한 상태를 유지하기 위해 액터를 사용하는 것입니다. Sink.actorRef은 싱크에 기존 액터 ref를 래핑합니다 (예 :

class Keeper extends Actor { 
    var i: Int = 0 

    override def receive: Receive = { 
    case n: Int ⇒ i = n 
    case Keeper.Get ⇒ sender ! i 
    } 
} 

object Keeper { 
    case object Get 
} 

val actorRef = system.actorOf(Props(classOf[Keeper])) 

val q = Source.repeat(1) 
    .scan(0)(_+_) 
    .runWith(Sink.actorRef(actorRef, PoisonPill)) 

val result = (actorRef ? Keeper.Get).mapTo[Int] 

Sink.actorRef을 사용하면 배압이 보존되지 않습니다. 이것은 Sink.actorRefWithAck을 사용하여 향상시킬 수 있습니다. 이에 대한 자세한 내용은 docs에서 확인할 수 있습니다.

+0

배우와의 병렬 처리 및 메시지 전달에 문제가 있습니까? – vgkowski

+0

위의 예와 관련된 병렬 처리가 없습니다. 액터로 순차적으로 메시지를 보내면 액터는 메시지를 순차적으로 처리합니다. 메시지의 순서는 송신자별로 Akka에 의해 보장됩니다. –