2016-11-23 3 views
0

내 API는 두 개의 개별 서비스에 대해 약 100 개의 다운 스트림 호출을 쌍으로 만듭니다. 모든 응답을 집계해야 클라이언트에 응답을 보낼 수 있습니다. hystrix-feign을 사용하여 HTTP 호출을합니다.observable을 blocking observable로 변환하여 rxJava를 잘못 사용합니까?

나는 내가 rxJava docs에 나는 다음과 같은

BlockingObservable 차단 연산자를 제공하는 관찰 가능한 다양한입니다 찾을 때까지 우아한 솔루션을 믿었 무엇을 함께했다. 테스트 및 데모 용도로 유용 할 수 있지만 일반적으로 프로덕션 응용 프로그램에는 적합하지 않습니다 (Blocking Observable을 사용해야한다고 생각하면 일반적으로 디자인을 재고해야한다는 신호 임).

이 설정에 따라

List<Observable<C>> observables = new ArrayList<>(); 
for (RequestPair request : requests) { 
    Observable<C> zipped = Observable.zip(
     feignClientA.sendRequest(request.A()), 
     feignClientB.sendRequest(request.B()), 
     (a, b) -> new C(a,b)); 
    observables.add(zipped); 
} 

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>(); 

Observable 
    .merge(observables) 
    .toBlocking() 
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse))); 

return apiResponse; 

몇 가지 질문에 다음과 같이 내 코드는 거의 같습니다

  • 내가 이해 정정 있습니까 내 유스 케이스 주어진 정당화

    1. 이 toBlocking()인가를 그 주 스레드가 forEach()에 도착할 때까지 실제 HTTP 호출이 이루어지지 않습니다.
    2. 코드 forEach() 블록은 다른 스레드에 의해 실행되지만 forEach() 블록에 둘 이상의 스레드가있을 수 있는지 확인할 수 없습니다. 거기에 동시 실행이 있습니까?
  • 답변

    1

    더 나은 옵션은 Observable 다른 사업자에 의해 소비 될 수 있지만 코드를 차단 멀리 얻을 수 있습니다 반환하는 것입니다 (백그라운드 스레드에서 실행하지만, 그것은해야한다.)

    public Observable<D> getAll(Iterable<RequestPair> requests) { 
        return Observable.from(requests) 
        .flatMap(request -> 
         Observable.zip(
          feignClientA.sendRequest(request.A()), 
          feignClientB.sendRequest(request.B()), 
          (a, b) -> new C(a,b) 
         ) 
        , 8) // maximum concurrent HTTP requests 
        .map(both -> doSomeWork(both)); 
    } 
    
    // for legacy users of the API 
    public Collection<D> getAllBlocking(Iterable<RequestPair> requests) { 
        return getAll(requests) 
         .toList() 
         .toBlocking() 
         .first(); 
    } 
    

    forEach이 operati의 전체 순서를 트리거, 내가 메인 스레드가 foreach는()

    예에 도달 할 때까지 실제 HTTP 호출이 이루어되지 않는 것이 이해 수정 있습니까 ons. foreach는() 블록의 코드가 서로 다른 쓰레드에 의해 실행,하지만 난 foreach는() 블록에서 하나 개 이상의 스레드가있을 수 있는지 확인 할 수 없습니다 것을 내가 본

    . 거기에 동시 실행이 있습니까?

    한 번에 하나 개의 스레드가 forEach에서 람다를 실행할 수 있습니다하지만 당신은 실제로 다른 스레드가 입력 볼 수 있습니다.