2017-10-31 8 views
4

브로드 캐스트 및 지퍼가있는 흐름 그래프가 있습니다. 무언가 (그것이 무엇이든 상관없이)가이 흐름에서 실패하면, 문제가되는 요소를 전달하고 다시 시작하고 싶습니다.Akka 스트림 - 실패 후 브로드 캐스트 및 지퍼가있는 그래프 다시 시작

(1,1) 
(2,2) 
(3,3) 
(4,4) 

우리는 어떤 일을했습니다에만 인쇄, 그것은 교착,

(1,1) 
(2,2) 
(3,3) 
(4,4) 
(5,5) 
(6,6) 
(7,7) 
(8,8) 
(9,9) 

을하지만 :

val flow = Flow.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val dangerousFlow = Flow[Int].map { 
    case 5 => throw new RuntimeException("BOOM!") 
    case x => x 
    } 
    val safeFlow = Flow[Int] 
    val bcast = builder.add(Broadcast[Int](2)) 
    val zip = builder.add(Zip[Int, Int]) 

    bcast ~> dangerousFlow ~> zip.in0 
    bcast ~> safeFlow ~> zip.in1 

    FlowShape(bcast.in, zip.out) 
}) 

Source(1 to 9) 
    .via(flow) 
    .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) 
    .runWith(Sink.foreach(println)) 

내가 그것을 인쇄 기대 : 나는 다음과 같은 솔루션을 함께했다 디버깅을했고, 아이들에게 "재개"전략을 적용했는데, 실패 후 dangerousFlow이 다시 시작되어 bcast의 요소를 요구하게되었습니다. bcastsafeFlow이 실제로는 전혀 발생하지 않는 다른 요소를 요구할 때까지 요소를 방출하지 않습니다 (수요가 zip이기 때문에).

스테이지 중 무엇이 잘못되었는지에 관계없이 그래프를 다시 시작할 수 있습니까?

답변

3

문제를 잘 이해하고 있다고 생각합니다. 요소 5이 (가) dangerousFlow에 충돌하면 을 통과하는 요소 5도 중지해야합니다. zip 단계에 도달하면 설명하는 문제가 발생합니다. broadcastzip 단계 사이에서 문제를 해결하는 방법을 모르겠지만 처리가 쉬운 곳에서 문제를 더욱 밀어 붙이는 것은 어떨까요?

dangerousFlow 다음 사용을 고려 :

import scala.util._ 
val dangerousFlow = Flow[Int].map { 
    case 5 => Failure(new RuntimeException("BOOM!")) 
    case x => Success(x) 
} 

에도 문제의 경우, 여전히 데이터를 방출 할 dangerousFlow. 현재는 현재 zip으로 할 수 있으며 그래프의 마지막 단계로 collect 단계를 추가하면됩니다.

Flow[(Try[Int], Int)].collect { 
    case (Success(s), i) => s -> i 
    case (_, i)   => i -> i 
} 
:

지금 당신이 쓴대로, 당신이 정말로 출력에 (5, 5) 튜플을 기대한다면
Flow[(Try[Int], Int)].collect { 
    case (Success(s), i) => s -> i 
} 

, 다음 사용 : 같은 흐름에서이 보일 것이다