flatmap()
을 사용하여 Completable
으로 변환하면됩니다.검증
@Test
public void foo() throws Exception {
Observable.range(1, 10)
.flatMap(this::getNItemsFromNetwork)
.flatMap(this::asyncCompuatation)
.ignoreElements()
.subscribe(() -> System.out.println("onComplete"),
(t) -> System.out.println("onError"));
Thread.sleep(10000);
}
Observable<String> getNItemsFromNetwork(int count) {
return Observable.just(count)
.subscribeOn(Schedulers.io())
.doOnNext(i -> System.out.println("Executing request for " + count + " on thread: " + Thread.currentThread()))
.flatMap(number -> Observable.just("Item nr " + number + ".1", "Item nr " + number + ".2"))
.delay(random.nextInt(1000), TimeUnit.MILLISECONDS);
}
Observable<String> asyncCompuatation(String string) {
return Observable.just(string)
.subscribeOn(Schedulers.computation())
.delay(random.nextInt(1000), TimeUnit.MILLISECONDS)
.doOnNext(number -> System.out.println("Computing " + number + " on thread: " + Thread.currentThread()));
}
출력 : 여기서
병렬 모든 그때는
computation
풀에서 이러한 항목에 대한 계산을 수행
io
웅덩이의 2 개 항목을 리턴 네트워크 요청을 실행 (시뮬레이션)하는 예이다
Executing request for 7 on thread: Thread[RxCachedThreadScheduler-7,5,main] Executing request for 6 on thread: Thread[RxCachedThreadScheduler-6,5,main] Executing request for 5 on thread: Thread[RxCachedThreadScheduler-5,5,main] Executing request for 1 on thread: Thread[RxCachedThreadScheduler-1,5,main] Executing request for 4 on thread: Thread[RxCachedThreadScheduler-4,5,main] Executing request for 3 on thread: Thread[RxCachedThreadScheduler-3,5,main] Executing request for 8 on thread: Thread[RxCachedThreadScheduler-8,5,main] Executing request for 2 on thread: Thread[RxCachedThreadScheduler-2,5,main] Executing request for 9 on thread: Thread[RxCachedThreadScheduler-9,5,main] Executing request for 10 on thread: Thread[RxCachedThreadScheduler-10,5,main] Computing Item nr 7.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 10.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 6.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 3.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 4.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 3.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 6.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 2.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 5.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 5.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 7.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 2.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 10.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 9.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 4.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 9.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 8.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 8.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 1.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 1.2 on thread: Thread[RxComputationThreadPool-1,5,main] onComplete
전체 코드입니까? 왜냐하면 나는 하나의 주제를 연기하고 병합하는 데 어떤 시점도 보이지 않기 때문입니다. 'startRequests()'코드도 게시하십시오. – Dimezis
@Dimezis'requestSubject.onNext (client.createRequest (parameter));'를 사용하여 (병렬의 첫 번째 라운드) 네트워크 요청을 만들고 게시하는 루프입니다. 나는 당신이 옳다고 생각한다. 요청을 만드는 것은 그것들을 실행하지 않기 때문에, 나는 단지 호출 스레드에서 그것들을 생성하고 서브 스크립 션에서 시작하는 Completable을 반환 할 수있다. 어쨌든 내가 게시 한 예제는 PublishSubject와 함께 작업하지 않았을 것이므로 ReplaySubject가 필요할 것이라고 생각합니다. – ferbeb