0

저는 RxJava를 처음 사용합니다. Observables를 반환하는 저지 RxJava 클라이언트가 몇 대 있습니다. 일부 데이터를 얻으려면 한 번의 호출을해야하며 그 데이터는 다음 3 번의 호출에 대한 입력이됩니다. 나는 그 부르심을 평행하게 만들어주기를 바란다. 마지막으로 모든 데이터가 필요한 모든 호출이 완료되면 계산을 수행하려고합니다. 여기 보이는 방법입니다 여러 통화를 보내 다음, 동기 호출을 할 flatMap를 사용하여 지퍼를 사용하거나 병합 할 수 있습니다여러 관찰 가능 호출을 비동기 적으로 호출 할 수 있지만 그 호출 전후에 동 기적으로 계산을 수행 할 수 있습니까?

interface Service { 
    Observable<ResultA> callServiceA(InitialData input); 
    Observable<ResultB> callServiceB(ResultA resultA); 
    Observable<ResultC> callServiceC(ResultA resultA); 
    Observable<ResultD> callServiceD(ResultA resultA); 
    FinalResult simpleCalculation(ResultA a, ResultB b, ResultC c, ResultD d); 
} 

class MyClass{ 

    @Autowired 
    ExecutorService myExecutorService; 

    Observable<FinalResult> myMethod(InitialData initialData){ 
    /* Make call to ServiceA, get the results, then make calls to services B, C, and D in parallel (on different threads), finally perform simpleCalculation, and emit the result */ 
    } 
} 

답변

0

, 다음 flatMap 그 때 다시 완료.

+1

당신이 나에게 샘플 코드를 제공 할 수 있습니까? – Adam

4

flatMap()zip()은이 상황에서 친구입니다.

Observable<FinalResult> myMethod(InitialData initialData) { 
    return service 
      .callServiceA(initialData) 
      .flatMap(resultA -> Observable.zip(
        service.callServiceB(resultA), 
        service.callServiceC(resultA), 
        service.callServiceD(resultA), 
        (resultB, resultC, resultD) -> 
         service.simpleCalculation(resultA, resultB, resultC, resultD)) 
      ); 
} 

과 같이 표시됩니다 관찰 반환을 사용 :

Subscription subscription = 
     myMethod(new InitialData()) 
       .subscribe(finalResult -> { 
          // FinalResult will end up here. 
         }, 
         throwable -> { 
          // Handle all errors here. 
         }); 
+0

감사합니다. 다른 스레드에서 병렬로 B, C 및 D 서비스를 호출하도록하려면 어떻게해야합니까? 스케줄러를 추가하지 않아도됩니까? – Adam

+0

많은 사람들이 네트워크 호출에 대해 Retrofit을 사용하여 자동으로 병렬로 호출을 예약합니다. 서비스가 이러한 서비스를 동 기적으로 실행하는 경우 각 호출에 .subscribeOn (Schedulers.io())을 추가하십시오. – kjones

+0

개조를 사용하고 있지 않습니다. 저지 클라이언트를 사용하고 있습니다. 과거에는 .observeOn (Schedulers.from (executorService))을 체인 끝에 추가했습니다. 그것들은 .subscribeOn과 어떻게 다른가요? – Adam