2017-09-10 15 views
0

에 감소 그리고 난 형태로 집계 스트림으로 변환 할 :Akka 스트림은 내가 데이터 <pre><code>A A A B B C C C C ... (very long) </code></pre> <p></p>의 정렬 된 스트림이 작은 스트림

(A, 3) (B, 2) (C, 4) 

Akka Streams에서 어떤 연산자를 사용할 수 있습니까?

Source.fromPublisher(publisher) 
    .aggregateSomehow() // ? 
    .runWith(sink) 

나는 .groupBy으로 검토 한하지만 내가하지 미리 카테고리의 번호를 알고 있어야합니다. 또한 나는 그것을 피하기를 원하는 모든 그룹을 기억에 남길 것이라고 믿는다. 처리가 끝난 후 (A, 3)을 버리고 소비하는 리소스를 확보 할 수 있어야합니다.

: This question은 SubFlows와 비슷한 기능을 요구합니다. 그러나 SubFlows를 사용하면 statefulMapConcat 연결자를 사용하는 솔루션이 있기 때문에 필요하지 않은 것처럼 보입니다. 입력 스트림에 요소를 추가 요구

그러나
Source(List("A", "A", "B", "B", "B", "C", "C", "")) 
     .statefulMapConcat({() => 
     var lastChar = "" 
     var count = 0 

     char => if(lastChar == char) { 
      count += 1 
      List.empty 
      } else { 
      val charCount = (lastChar, count) 
      lastChar = char 
      count = 1 
      List(charCount) 
      } 
     }) 
    .runForeach(println) 

의 끝을 표시 :

+0

가능한 복제 [? 어떻게 내가 서브 플로우에 정렬 된 스트림의 그룹 항목 (https://stackoverflow.com/ 질문/45436102/how-do-i-group-items-of-sorted-stream-with-subflows) – chunjef

+0

@chunjef 포인터 주셔서 감사합니다! 분명히 관련이 있지만 확실하지는 않습니다. SubFlows가 반드시 필요한 것은 아닙니다. – Andrejs

답변

0

하나의 옵션은 statefulMapConcat 연결자를 사용하는 것이다.

출력 :

(,0) 
(A,2) 
(B,3) 
(C,2) 

감사 의견의 제안에 대한 @chunjef하는