2017-12-01 13 views
0

나는 List[String]Source.queue을 가지고 있습니다. 나는이 대기열 문자열 요소를 일정 시간 간격 후에 제공하려고합니다. 다음과 같이 입력하십시오 :Akka SourceQueue가 목록 요소를 보내려면

val data : List[String] = "" 
val tick = Source.tick(0 second, 1 second, "tick") 
tick.runForeach(t => queue.offer(data(??)) 

누군가 나를 도울 수 있습니까?

편집 : 나는 방법을 찾았지만, 각 요소에 대해 특정 시간 간격으로 큐에 List[String]에서 요소를 보내 다음과 같은 방식으로 Source#delay를 사용하려면 더 우아한 방법을

val tick = Source.tick(0 second, 2 second, "tick").zipWithIndex.limit(data.length) 

tick.runForeach(t => { 
    queue.offer(data(t._2.toInt)) 
}) 

답변

0

을 찾고있다 :

val data: List[String] = ??? 

Source(data) 
    .delay(2.seconds, DelayOverflowStrategy.backpressure) 
    .withAttributes(Attributes.inputBuffer(1, 1)) 
    .mapAsync(1)(x => queue.offer(x)) 
    .runWith(Sink.ignore) 

기본값이 16이므로 withAttributes으로 입력 버퍼 크기를 1로 설정하고 DelayOverflowStrategy.backpressure을 사용하십시오. offer 메서드가 Future을 반환하기 때문에 mapAsync을 사용합니다. 이 중 하나는`1 '초 지연 후 큐에 한 번에 모든 요소를 ​​보내는

Source(data) 
    .throttle(1, 2.seconds, 1, ThrottleMode.Shaping) 
    .mapAsync(1)(x => queue.offer(x)) 
    .runWith(Sink.ignore) 
+0

을하지만리스트의 요소들 사이의 시간 간격을 유지하려면 :

또는 Source#throttle를 사용합니다. 아래의 내용은 나에게 도움이되지만 좀 더 우아한 방법을 찾고있다. 'val tick = Source.tick (0 초, 2 초, "tick") .zipWithIndex.limit (randomString.length)' 'tick.runForeach t => { queue.offer (randomString (t._2.toInt)) })' – user3810264

+0

@ user3810264 : 업데이트되었습니다. – chunjef