공지
빅토르 클랑은 코멘트 바와 같이 입력 요소 카운트 출력에 대해 1이 공지 될 때 Tuple2[O,O2]
으로 압축하는 것은 두 흐름 flow1
& flow2
가 1임을 만 실행 가능 요소 수.
그래프 기반 솔루션
튜플 구조체는 Graph 내부에 생성 될 수있다. 사실, 당신의 질문은 거의 완벽 입문 예를 일치 :

링크에서 샘플 코드를 확장을, 당신은 사용할 수 있습니다 Broadcast 및 Zip
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Zip[Int, Int]()) //different than link
val f1, f2, f4 = Flow[Int].map(_ + 10)
val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})//end RunnableGraph.fromGraph
다소 해키 스트림 솔루션
순수 스트림 솔루션을 찾고있는 경우 중간 스트림을 사용할 수 있지만 Mat
이 유지되지 않을 것이며, 각 입력 요소에 대한 2 개 스트림의 구체화 포함한다 :이 압축하는 일반적인 대부분의 흐름에 대해, 출력 유형을 확인하려면
def andFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[O] = Source
.single(i)
.via(flow1)
.to(Sink.head[O])
.run()
val o2 : Future[O2] = Source
.single(i)
.via(flow2)
.to(Sink.head[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
일반 압축 중
을 것이다 (Seq[O], Seq[O2])
이어야합니다.이 유형은 상기 andFlows
함수 Sink.seq
를 사용하는 대신 Sink.head
의해 생성 될 수있다 : 1 정도로 같은 일반 연결자 (1 개 출력 된 1 개 입력) 힘들 것이다 :
def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[Seq[O]] = Source
.single(i)
.via(flow1)
.to(Sink.seq[O])
.run()
val o2 : Future[Seq[O2]] = Source
.single(i)
.via(flow2)
.to(Sink.seq[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
플로우는 엄격히 1이 아니다. (GraphDSL을 사용하고 브로드 캐스트 + 병합을 사용할 수 있습니다) –