2017-01-28 8 views
0

Akka-stream 설명서에서 모든 병합 된 스트림 (merge, mergeSorted, mergePreferred, zipN, zipWithN)은 병합 된 모든 스트림에 새 요소가있을 때까지 대기하는 것처럼 작동합니다 준비, 병합 전략 적용 (요소를 튜플에 결합하거나 zip 함수를 적용하는 등)하위 스트림 중 _any_에 값 준비가되었을 때 스트림 병합

이것은 오프라인 처리 (예 : 파일 또는 HTTP에서 데이터 읽기 및 결합)에 적합하지만 대기 시간 온라인 처리에서. 내가 만든 데이터 스트림을 병합해야합니다. 다중 웹 소켓 연결을 제공하고 소스 스트림 중 하나가 값을 생성하는 즉시 병합 된 스트림에서 업데이트를 제공합니다. 예 : 소스 스트림 A 및 B가있는 경우 병합 된 스트림에 있어야 할 내용은 다음과 같습니다.

출력 스트림은 몇 가지 초기 값으로 시작됩니다. (None, None). 소스 스트림의 임의 즉시, 값을 생성 할 때

(A:1) (B:<not ready>) -> (Some(1), None) 
(A:2) (B:<not ready>) -> (Some(2), None) 
(A:3) (B:1)   -> (Some(3), Some(1)) 
(A:3) (B:2)   -> (Some(3), Some(2)) 

등 다시 새로운 값은 출력 스트림에서 나타난다.

달성 할 조합자가 있습니까?

+0

> 병합 된 모든 스트림에 새로운 요소가 준비되어있을 때까지 기다려서 - 이것은 사실 일 것 같지 않습니다. 예를 들어, [Merge] (http://doc.akka.io/api/akka/current/index.html#akka.stream.scaladsl.Merge) 문서 페이지를 참조하십시오. 그것은 분명히 "입력 중 하나 **에 사용 가능한 요소가있을 때 방출됩니다"(강조 표시). –

답변

2

설명에서 설명한 것처럼 MergeMergePreferred 단계는 모든 업스트림에 사용 가능한 요소가 없더라도 요소를 다운 스트림으로 방출합니다.

예를 들어 소스 압축 방법을 찾고있는 것처럼 보입니다. 그리고 예, Zip은 모든 업스트림에서 압축 할 요소가있을 때만 압축 된 튜플을 다운 스트림으로 방출합니다. 이를 극복하기 위해 소스를 '들려서 Option'을 생성하고 방출 할 때마다 None을 내 보냅니다. 원본 래퍼는 다음과 같이 표시 될 수 있습니다.

def asOption[In, Mat](source: Source[In, Mat]): Source[Option[In], Mat] = 
    Source.fromGraph(GraphDSL.create(source.map(Option(_))) { 
     implicit builder: GraphDSL.Builder[Mat] => src => 
     import GraphDSL.Implicits._ 

     val noneSource = Source.repeat(None) 
     val merge = builder.add(MergePreferred[Option[In]](1)) 

     src  ~> merge.preferred 
     noneSource ~> merge.in(0) 

     SourceShape(merge.out) 
    }) 

이 때 원본을 압축 할 수 있습니다.

val src1: Source[Int, NotUsed] = ??? 
    val src2: Source[Int, NotUsed] = ??? 

    val zipped = asOption(src1) zip asOption(src2)