2017-10-24 4 views
0

Ive에 이와 같이 수천 개의 파일이 있습니다.Akka Streams : 파일 소스 제한

Mr|David|Smith|[email protected] 
Mrs|Teri|Smith|[email protected] 
... 

나는 각 줄을 하류에 있지만 제한된 방식으로 방출하는 파일을 읽고 싶습니다. 1/sec.

흐름에서 조절 기능을 작동시키는 방법을 알 수 없습니다.

flow1 (아래)은 1 초 후에 첫 번째 줄을 출력 한 다음 종료됩니다.
flow2 (아래)은 1 초 동안 대기 한 다음 전체 파일을 출력합니다.

val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file) 

val flow1 = Flow[ByteString]. 
       via(Framing.delimiter(ByteString(System.lineSeparator),10000)). 
       throttle(1, 1.second, 1, ThrottleMode.shaping). 
       map(bs => bs.utf8String) 

val flow2 = Flow[ByteString]. 
       throttle(1, 1.second, 1, ThrottleMode.shaping). 
       via(Framing.delimiter(ByteString(System.lineSeparator), 10000)). 
       map(bs => bs.utf8String) 

val sink = Sink.foreach(println) 
val res = source.via(flow2).to(sink).run().onComplete(_ => system.terminate()) 

docs의 해결책을 모을 수 없습니다.

모든 포인터가 크게 도움이됩니다. flow1

답변

1

사용 runWith, 대신 to :

val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file) 

val flow1 = 
    Flow[ByteString] 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000)) 
    .throttle(1, 1.second, 1, ThrottleMode.shaping) 
    .map(bs => bs.utf8String) 

val sink = Sink.foreach(println) 

source.via(flow1).runWith(sink).onComplete(_ => system.terminate()) 
+0

완벽한, 감사 제프! – Nio

+0

그래서 내 흐름이 의도 한대로 작동하고 있었다고 생각하지만 싱크를 사용하여 잘못 디버깅하고있었습니다. – Nio