2016-07-10 4 views
3

어떤 이유로, 제 Akka 스트림은 항상 첫 번째 메시지를 "방출"(?)하기 전에 두 번째 메시지를 기다립니다.Akka 리 액티브 스트림은 항상 하나의 메시지 뒤에 있습니다

다음은 내 문제를 보여주는 몇 가지 예제 코드입니다.

val rx = Source((1 to 100).toStream.map { t => 
    Thread.sleep(1000) 
    println(s"doing $t") 
    t 
}) 
rx.runForeach(println) 

출력 산출 :

doing 1 
doing 2 
1 
doing 3 
2 
doing 4 
3 
doing 5 
4 
doing 6 
5 
... 

내가 원하는 무엇 : 당신은 전체 컬렉션 게으른 것을 의미한다 .toStream()를 사용하는

doing 1 
1 
doing 2 
2 
doing 3 
3 
doing 4 
4 
doing 5 
5 
doing 6 
6 
... 
+1

Akka 스트림에는 항상 계산 단계마다 하나의 요소 버퍼가 있습니다. 아마 당신은 그것을보고 있습니다. – eiennohito

답변

5

코드가 설정되는 방식으로, 방출 요소 다운 스트림을 시작하기 전에 Source을 완전히 변형하고 있습니다. 소스를 나타내는 숫자 범위의 toStream을 제거하면 해당 동작을 @slouc 명시된대로 명확하게 볼 수 있습니다. 그렇게하면 다운 스트림 수요에 응답하기 전에 Source이 완전히 변형 된 것을 볼 수 있습니다. 당신이 실제로 SinkSource를 실행하고 중간에 변환 단계를 원한다면, 당신은 시도 할 수 있습니다와 같은 구조 일 : 당신이 변경하면

val transform = 
    Flow[Int].map{ t => 
    Thread.sleep(1000) 
    println(s"doing $t") 
    t 
    } 

Source((1 to 100).toStream). 
    via(transform). 
    to(Sink.foreach(println)). 
    run 

는, 당신은 원하는 효과를 얻을 것이다 이는 다음 요소가 처리되기 전에 하류로 흐르는 요소가 흐름 전체에서 처리된다는 것입니다.

+0

나는 내 문제를 단순화하고 해결할 수있을 것이라고 생각했지만, 나는 그것을 과도하게 단순화했다고 생각한다. 귀하의 대답은 내가 게시 한 문제를 해결하지만, 나는 더 자세한 내용으로 새로운 질문을 만들어야 할 수도 있습니다. – SGHAF

1

. 그것 없이는 출력은 처음 100 번 "1"에서부터 100까지의 숫자가 뒤따를 것입니다. 그러나 Stream은 첫 번째 요소 만 평가하여 "수행중인 1"출력을 얻습니다.이 출력은 멈추는 곳입니다. 다음 요소는 필요할 때 평가됩니다.

이제는 문서에서 이에 대한 세부 정보를 찾을 수 없지만 runForeach에는 현재 요소에서 함수를 호출하기 전에 다음 요소를 사용하는 구현이 있다고 가정합니다. 따라서 요소 n에서 println을 호출하기 전에 먼저 요소 n + 1을 검사하여 (예 : 존재하는 지 확인) n + 1 메시지를 처리합니다. 그런 다음 현재 요소에서 println 함수를 수행하여 메시지 "n"을 얻습니다.

runForeach 전에 실제로 map()이 필요합니까? 데이터를 통해 두 명의 여행사가 필요합니까? 나는 아마 명백한 진술하고있어 알아,하지만 당신은 단지 하나를 사용하여 데이터를 처리 할 경우 다음과 같이 이동 :

val rx = Source((1 to 100).toStream) 
rx.runForeach({ t => 
    Thread.sleep(1000) 
    println(s"doing $t") 
    // do something with 't', which is now equal to what "doing" says 
}) 

다음 당신이 때 평가 있는지의 문제가 없습니다.

+0

게시했을 때 실제로 문제를 단순화했습니다. 스트림은 Java BufferedReader (소켓에서 가져옴)에서 가져옵니다. 'Source (Stream.continually (myBufferedReader.readLine) .map (t => (System.nanoTime, t))) ' 메시지를 실제로 도착했을 때 "타임 스탬프"를 시도하기 위해 원래 이렇게했습니다. 내 기계, 그리고 마침내 그들이 최종 처리되었을 때. 답변을 주셔서 감사합니다. 이것은 지금 나에게 더 분명하다! – SGHAF