브로드 캐스트 및 지퍼가있는 흐름 그래프가 있습니다. 무언가 (그것이 무엇이든 상관없이)가이 흐름에서 실패하면, 문제가되는 요소를 전달하고 다시 시작하고 싶습니다.Akka 스트림 - 실패 후 브로드 캐스트 및 지퍼가있는 그래프 다시 시작
(1,1)
(2,2)
(3,3)
(4,4)
우리는 어떤 일을했습니다에만 인쇄, 그것은 교착,
(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)
을하지만 :
val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val dangerousFlow = Flow[Int].map {
case 5 => throw new RuntimeException("BOOM!")
case x => x
}
val safeFlow = Flow[Int]
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Int])
bcast ~> dangerousFlow ~> zip.in0
bcast ~> safeFlow ~> zip.in1
FlowShape(bcast.in, zip.out)
})
Source(1 to 9)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.foreach(println))
내가 그것을 인쇄 기대 : 나는 다음과 같은 솔루션을 함께했다 디버깅을했고, 아이들에게 "재개"전략을 적용했는데, 실패 후 dangerousFlow
이 다시 시작되어 bcast
의 요소를 요구하게되었습니다. bcast
은 safeFlow
이 실제로는 전혀 발생하지 않는 다른 요소를 요구할 때까지 요소를 방출하지 않습니다 (수요가 zip
이기 때문에).
스테이지 중 무엇이 잘못되었는지에 관계없이 그래프를 다시 시작할 수 있습니까?