2017-10-19 27 views
5

내가 어떤 쿼리 기준에 따라 찾거나 위젯을 구성합니다, API를 가지고 말 같은 :비동기 자바 벡터/스트림 데이터에 대한 합성 가능한 반환 값은 9

try { 
    Widget w = getMatchingWidget(criteria); 
    processWidget(w); 
} catch (Throwable t) { 
    handleError(t); 
} 

이제 말을 찾거나 위젯을 구성하는 것은 예상치 못한 비용, 그리고 내가 기다리는 동안 클라이언트는 차단하지 않습니다.

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c) 

클라이언트는 다음 쓸 수 있습니다 :

CompletableFuture<Widget> f = getMatchingWidget(criteria); 
f.thenAccept(this::processWidget) 
f.exceptionally(t -> { handleError(t); return null; }) 

나 : 그래서으로 변경 이제

getMatchingWidget(criteria).whenComplete((t, w) -> { 
    if (t != null) { handleError(t); } 
    else { processWidget(t); } 
}); 

대신 동기식 API가 n 개의 위젯 0을 반환 할 수의 말을하자 :

Stream<Widget> getMatchingWidgets(WidgetCriteria c) 

순진, 내가 쓸 수있다 : - Widgets 사용할 수 있습니다, 또는 모든 때까지 어느 Future 블록

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c) 

그러나,이 실제로 코드 비 차단을하지 않습니다, 그것은 단지 주변의 차단을 밀어 각 Widget을 기다리는 Stream 블록을 반복하는 코드. 내가 원하는처럼 그들이 도착하는 날 각 위젯을 처리 할 무언가, 무언가이다 :

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor) 

하지만이 오류 처리를 제공하지 않습니다, 나는 추가 Consumer<Throwable> errorHandler를 추가 할 경우에도, 그것은 나를하지 않습니다 예를 들어 내 위젯 검색을 다른 쿼리로 작성하거나 결과를 변환 할 수 있습니다.

그래서 Stream (반복성, 변환 가능성)의 특성과 CompletableFuture (비동기 결과 및 오류 처리)의 특성을 결합하여 구성 가능한 것을 찾고 있습니다. (우리가 작업하는 동안 역 압력은 좋을 수도 있습니다.)

java.util.concurrent.Flow.Publisher이 맞습니까? io.reactivex.Observable? 더 복잡한 무엇입니까? 좀 더 단순한가?

+2

대기열에 물건을 넣고 대기열에서 물건을 꺼내는 경향이 있습니다. –

답변

6

유스 케이스는 RxJava가 처리하는 세계에 매우 자연스럽게 속합니다.

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

관찰 가능한 체인을 backgroundScheduler에서 실행됩니다 : 그것은 사용 나타나는

이 기준에 따라 0 개 이상의 위젯을 생성
Observable<Widget> getMatchingWidgets(wc); 

는 다음 각 위젯을 처리 할 수 ​​있습니다 : 우리는 관찰이있는 경우 이것은 종종 스레드 풀 실행자 서비스의 래퍼입니다.당신이 당신의 UI에서 각 위젯의 최종 처리를해야 할 경우 처리 전에 UI 스케줄러로 전환하기 위해 observeOn() 연산자를 사용할 수 있습니다

나에게
getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .observeOn(uiScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

의 RxJava 방식의 우아함이 있다는 것입니다 파이프 라인 관리의 많은 부분을 유창하게 처리합니다. 관찰자 체인을 살펴보면 무슨 일이 일어나고 있는지 정확히 알 수 있습니다.