2016-11-09 8 views
0

RxJava 1/RxScala에서 다음 상황에서 관찰 할 수있는 소스를 어떻게 조절할 수 있습니까?리소스 부족으로 볼 수있는 역압

def fast: Observable[Foo] // Supports backpressure 

def afterExpensiveOp: Observable[Bar] = 
    fast.flatMap(foo => Observable.from(expensiveOp(foo)) 

// Signature and behavior is out of my control 
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = { 
    if(noResources()) Future.failed(new OutOfResourcesException()) 
    else Future { Bar() } 
} 

가능한 해결 방법은 차단 될 때까지 기다리는 것입니다. 어떤 작품,하지만 매우 우아하고 동시에 여러 요청을 방지 :

def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo => 
    Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head) 
) 
+1

아마이 연산자를 직접 작성해야합니다. RxScala에서 연산자는'Subscriber [T] => Subscriber [R]'함수이고'lift '를 사용하여 Observable에 적용 할 수 있습니다. 어떤 점에서, 생성하고있는'Subscriber [R]'은 사용할 수있는 리소스가 있는지를 확인해야만합니다. 그렇다면 상속받은'request' 메소드를 호출하여'fast' observable에서 더 많은 아이템을 가져옵니다. –

답변

0

flatMap 동시 가입자의 수를 제한하는 매개 변수가 있습니다. 이 평면 맵을 사용하면 배후 압력이 처리됩니다.

def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))