2017-02-28 4 views
0

동시 네트워크 요청을 만들어야합니다. 이러한 요청의 결과에 따라 더 많은 요청이 시작될 수 있습니다.알 수없는 병렬 하위 작업 수를 하나의 완료 가능으로 합치십시오.

모든 요청이 완료되고 더 이상의 요청을 만들지 않으면 완료되는 하나의 완료 가능을 얻고 싶습니다. 내 질문은

, 다음 코드를 사용하여 달성이 가능하다 :이 예에서

return Completable.defer(() -> { 
     startRequests(); 
     return Observable.merge(requestSubject.asObservable()).toCompletable(); 
    }); 

이 startRequest가 PublishSubject<Observable<SomeResponse>> 인 requestSubject에 네트워크 요청 (개조)를 추가합니다.

특히 네트워크 요청이 가입 한 IO 스케줄러에서 시작하고 요청 완료 중 하나가 onNext에서 완료 될 때까지 Completable이 완료되면 requestSubject.onComplete()을 호출합니다.

요청을 두 번 실행하지 않고 요청의 응답을 처리하는 방법을 아직 이해하지 못했습니다 (각 구독에 대한 개조 요청).

이 방법으로 작동합니까, 아니면 내가 찾고있는 것을 성취 할 수있는 더 좋은 방법이 있습니까? 감사!

+0

전체 코드입니까? 왜냐하면 나는 하나의 주제를 연기하고 병합하는 데 어떤 시점도 보이지 않기 때문입니다. 'startRequests()'코드도 게시하십시오. – Dimezis

+0

@Dimezis'requestSubject.onNext (client.createRequest (parameter));'를 사용하여 (병렬의 첫 번째 라운드) 네트워크 요청을 만들고 게시하는 루프입니다. 나는 당신이 옳다고 생각한다. 요청을 만드는 것은 그것들을 실행하지 않기 때문에, 나는 단지 호출 스레드에서 그것들을 생성하고 서브 스크립 션에서 시작하는 Completable을 반환 할 수있다. 어쨌든 내가 게시 한 예제는 PublishSubject와 함께 작업하지 않았을 것이므로 ReplaySubject가 필요할 것이라고 생각합니다. – ferbeb

답변

1

질문이 100 % 맞았는지 잘 모르겠지만, 여기에 대략적인 스케치가 있습니다. 나는 Subject을 캐싱을위한 중간 수준으로하고 실제 요청을 방해하지 않을 때가 있다고 믿습니다. 당신은 탈퇴라고 부를 것이다.

1) 귀하에게 2 개의 추가 관찰 관찰 대상이 있다고 가정하십시오.

2) startRequests()에서 (사용자가 필요로하는 일부 스케줄러에서) 두 사람을 구독해야하고 doOnNext 연산자를 적용하고 데이터를 subject에 위임해야합니다. 따라서 주제는 API에서 2 틱 데이터를받습니다.

3) 귀하의 주제에 가입하면 데이터 2 틱을 받게됩니다.

기본적으로 완료 될 때까지 기다릴 필요가 없습니다. N 개의 onNext 틱을 받게됩니다. 그러나 모든 요청이 완료되었다는 표시기를 원한다면 모든 개조 된 관찰 사항을 병합하고 모든 이벤트를 주체에 위임 할 수 있으므로 끝에 N 개의 onNext 틱과 onComplete를 받게됩니다.

0

Subjects을 사용하면 불필요한 합병증이라고 생각합니다. 간단히 flatMap()을 사용하고 Completable으로 끝 부분에서 toCompletable()을 사용하여 전송할 수 있습니다. 특정 루프가 어떻게 작동하는지는 언급하지 않았지만 루프 쿼리에서 이와 같은 것이 나오는 일부 목록이 있다고 가정하면 은 Retrofit을 반환합니다. 쿼리 Observable :

List<Data> list = ...; 
    Observable.from(list) 
      .flatMap(new Func1<Data, Observable<Result>>() { 
       @Override 
       public Observable<Result> call(Data data) { 
        return startRequest(data); 
       } 
      }).toCompletable(); 

두 번째 요청에 대해 더 많은 요청을하는이 결과에 따라,이 경우에 당신이 toList()를 사용하여 모든 요청을 수집 할 수 있습니다, 당신은 하나 개의 onNext() 통지를 얻을 것이다 당신은 모두를 필터링 할 수 있습니다 , 더 많은 데이터를 요청할 때 아이템을내는 Observable을 얻으십시오 :

List<Data> list =...; 
    Observable.from(list) 
      .flatMap(new Func1<Data, Observable<Result>>() { 
       @Override 
       public Observable<Result> call(Data data) { 
        return startRequest(data); 
       } 
      }) 
      .toList() 
      .filter(new Func1<List<Result>, Boolean>() { 
       @Override 
       public Boolean call(List<Result> results) { 
        return shouldRequestMore(results); 
       } 
      }); 
2

flatmap()을 사용하여 Completable으로 변환하면됩니다.검증

@Test 
public void foo() throws Exception { 
    Observable.range(1, 10) 
      .flatMap(this::getNItemsFromNetwork) 
      .flatMap(this::asyncCompuatation) 
      .ignoreElements() 
      .subscribe(() -> System.out.println("onComplete"), 
        (t) -> System.out.println("onError")); 

    Thread.sleep(10000); 
} 

Observable<String> getNItemsFromNetwork(int count) { 
    return Observable.just(count) 
      .subscribeOn(Schedulers.io()) 
      .doOnNext(i -> System.out.println("Executing request for " + count + " on thread: " + Thread.currentThread())) 
      .flatMap(number -> Observable.just("Item nr " + number + ".1", "Item nr " + number + ".2")) 
      .delay(random.nextInt(1000), TimeUnit.MILLISECONDS); 
} 

Observable<String> asyncCompuatation(String string) { 
    return Observable.just(string) 
      .subscribeOn(Schedulers.computation()) 
      .delay(random.nextInt(1000), TimeUnit.MILLISECONDS) 
      .doOnNext(number -> System.out.println("Computing " + number + " on thread: " + Thread.currentThread())); 
} 

출력 : 여기서

병렬 모든 그때는 computation 풀에서 이러한 항목에 대한 계산을 수행 io 웅덩이의 2 개 항목을 리턴 네트워크 요청을 실행 (시뮬레이션)하는 예이다

Executing request for 7 on thread: Thread[RxCachedThreadScheduler-7,5,main] Executing request for 6 on thread: Thread[RxCachedThreadScheduler-6,5,main] Executing request for 5 on thread: Thread[RxCachedThreadScheduler-5,5,main] Executing request for 1 on thread: Thread[RxCachedThreadScheduler-1,5,main] Executing request for 4 on thread: Thread[RxCachedThreadScheduler-4,5,main] Executing request for 3 on thread: Thread[RxCachedThreadScheduler-3,5,main] Executing request for 8 on thread: Thread[RxCachedThreadScheduler-8,5,main] Executing request for 2 on thread: Thread[RxCachedThreadScheduler-2,5,main] Executing request for 9 on thread: Thread[RxCachedThreadScheduler-9,5,main] Executing request for 10 on thread: Thread[RxCachedThreadScheduler-10,5,main] Computing Item nr 7.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 10.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 6.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 3.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 4.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 3.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 6.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 2.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 5.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 5.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 7.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 2.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 10.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 9.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 4.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 9.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 8.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 8.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 1.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 1.2 on thread: Thread[RxComputationThreadPool-1,5,main] onComplete