0
RxJava 1/RxScala에서 다음 상황에서 관찰 할 수있는 소스를 어떻게 조절할 수 있습니까?리소스 부족으로 볼 수있는 역압
def fast: Observable[Foo] // Supports backpressure
def afterExpensiveOp: Observable[Bar] =
fast.flatMap(foo => Observable.from(expensiveOp(foo))
// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
if(noResources()) Future.failed(new OutOfResourcesException())
else Future { Bar() }
}
가능한 해결 방법은 차단 될 때까지 기다리는 것입니다. 어떤 작품,하지만 매우 우아하고 동시에 여러 요청을 방지 :
def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo =>
Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)
아마이 연산자를 직접 작성해야합니다. RxScala에서 연산자는'Subscriber [T] => Subscriber [R]'함수이고'lift '를 사용하여 Observable에 적용 할 수 있습니다. 어떤 점에서, 생성하고있는'Subscriber [R]'은 사용할 수있는 리소스가 있는지를 확인해야만합니다. 그렇다면 상속받은'request' 메소드를 호출하여'fast' observable에서 더 많은 아이템을 가져옵니다. –