2017-11-30 5 views
0

싱크를 정의하는 두 가지 방법의 차이점은 무엇인가요? [RandomCdr, Future 하는 Future맵 (T => Future [U])과 flatMapConcat (T => Source.fromFuture (Future [U])) 사이의 Akka 스트림 차이

Flow[RandomCdr] 
     .grouped(bulkSize) 
     .flatMapConcat{ (bulk : Seq[RandomCdr]) => 
     Source.fromFuture(collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)) 
     } 
     .toMat(Sink.ignore)(Keep.right) 


Flow[RandomCdr] 
    .grouped(bulkSize) 
    .map((bulk : Seq[RandomCdr]) => collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)) 
    .toMat(Sink.ignore)(Keep.right) 

Future를 [T]를 반환하는 함수 collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)는 reactivemongo 드라이버

답변

0

우선 니펫 여기서 각 수신 일괄 변환한다

이다 완료]은 d는 Future가 사용자가 제공 한 실행 컨텍스트 내에서 실행될 것이라고 말했습니다. 이 시점에서만 다음 번 대량이 다른 Future 등을 생성하여 처리됩니다.

기본적으로 선물은 순서대로 실행됩니다. 이것은 당신이 제공하는 실행 컨텍스트 내에서 실행됩니다

Flow[RandomCdr] 
     .grouped(bulkSize) 
     .mapAsync(parallelism = 1){ (bulk : Seq[RandomCdr]) => 
     collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec) 
     } 
     .toMat(Sink.ignore)(Keep.right) 

두 번째 조각 여기

각 들어오는 대량가 Future로 변환됩니다에 행동과 유사하다. Future은 즉시 Sink.ignore으로 전달되며 해당 참조는 버려집니다.

Future이 동시에 실행되는 횟수를 제어하지 않습니다. 이런 이유로이 방법은 권장되지 않습니다.

개선 된 병렬 처리를 찾으려면 위의 그림과 같이 mapAsync을 사용하고 병렬 처리 매개 변수를 조정하십시오.

+0

두 번째 스 니펫이 배압을 제공하지 않는다는 의미입니까? – vgkowski

+0

정확함, 배후 압력없이 가능한 한 많은 미래를 발사 할 것입니다. –