2017-11-18 15 views
7

다음 작업을 수행하는 Akka 스트림 연결자가 있습니까? (의 지금은 and를 호출하자.)두 흐름을 나란히 작성하는 방법은 무엇입니까?

(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat] 

의미는 어떤 소스, 그 요소가 모두 Flow들에 전달되고, 그 출력은 튜플로 새로운 Flow로 결합됩니다 것입니다. (카테고리 이론에서 화살표에 익숙한 사용자는 함수형 프로그래밍 맛, 나는 &&& 같은 뭔가를 찾고 있어요.) 즉, zipalsoTo을 관련 보았다 라이브러리에 두 콤비가 있습니다

. 그러나 전자는 SourceShape을, 후자는 SinkShape을받습니다. 어느 쪽도 GraphShape을 인정하지 않을 것이다. 왜 이런 경우입니까?

내 사용 사례는 다음과 같은 것입니다 :

someSource 
    .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs) 
    .runWith(someSink) 

이 작동하지만, 내가 찾고 있어요 :

someSource 
    .via(someFlowThatReturnsUnit.and(Flow.apply)) 
    .runWith(someSink) 

.and 같은 것을 발견하지 못하면,이 같은 내 원래 Flow 수정 더 청결하고,보다 조성적인 용액.

+1

플로우는 엄격히 1이 아니다. (GraphDSL을 사용하고 브로드 캐스트 + 병합을 사용할 수 있습니다) –

답변

6

공지

빅토르 클랑은 코멘트 바와 같이 입력 요소 카운트 출력에 대해 1이 공지 될 때 Tuple2[O,O2]으로 압축하는 것은 두 흐름 flow1 & flow2가 1임을 만 실행 가능 요소 수.

그래프 기반 솔루션

튜플 구조체는 Graph 내부에 생성 될 수있다. 사실, 당신의 질문은 거의 완벽 입문 예를 일치 :

enter image description here

링크에서 샘플 코드를 확장을, 당신은 사용할 수 있습니다 BroadcastZip

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 
    val in = Source(1 to 10) 
    val out = Sink.ignore 

    val bcast = builder.add(Broadcast[Int](2)) 

    val merge = builder.add(Zip[Int, Int]()) //different than link 

    val f1, f2, f4 = Flow[Int].map(_ + 10) 

    val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link 

    in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out 
       bcast ~> f4 ~> merge 
    ClosedShape 
})//end RunnableGraph.fromGraph 

다소 해키 스트림 솔루션

순수 스트림 솔루션을 찾고있는 경우 중간 스트림을 사용할 수 있지만 Mat이 유지되지 않을 것이며, 각 입력 요소에 대한 2 개 스트림의 구체화 포함한다 :이 압축하는 일반적인 대부분의 흐름에 대해, 출력 유형을 확인하려면

def andFlows[I, O, O2] (maxConcurrentSreams : Int) 
         (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
         (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[O] = Source 
          .single(i) 
          .via(flow1) 
          .to(Sink.head[O]) 
          .run() 

    val o2 : Future[O2] = Source 
          .single(i) 
          .via(flow2) 
          .to(Sink.head[O2]) 
          .run() 

    o zip o2 
    }//end Flow[I].mapAsync 

일반 압축 중

을 것이다 (Seq[O], Seq[O2])이어야합니다.이 유형은 상기 andFlows 함수 Sink.seq를 사용하는 대신 Sink.head 의해 생성 될 수있다 : 1 정도로 같은 일반 연결자 (1 개 출력 된 1 개 입력) 힘들 것이다 :

def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int) 
           (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
           (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[Seq[O]] = Source 
           .single(i) 
           .via(flow1) 
           .to(Sink.seq[O]) 
           .run() 

    val o2 : Future[Seq[O2]] = Source 
           .single(i) 
           .via(flow2) 
           .to(Sink.seq[O2]) 
           .run() 

    o zip o2 
    }//end Flow[I].mapAsync