나는 비교적 새로운 RxSwift에 익숙하다. 그러나 나는 그것을 내 프로젝트에서 더 많이 사용하기를 고대하고있다. 나는 방금 쓴 연산자에 대한 약간의 피드백을 듣고 싶다.RxSwift로 디 바운스 된 버퍼를 구현하는 것이 맞습니까?
누락 된 기능은 디버깅 된 버퍼입니다. debounce
연산자와 똑같은 동작을하는 버퍼이지만 최신 값만 방출하는 대신 마지막 방출 이후 수집 된 모든 값을 방출해야합니다. 버퍼 연산자의 버전이 존재하지 않지만 (RxSwift에서
// From: https://github.com/ReactiveX/RxJava/wiki/Backpressure
//
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
I을 :
RxJava에서 이것은 "폐쇄 선택기"등의 다른 관찰로 버퍼를 이용하여 용이하게 달성 이 문제는 관련이 있다고 생각합니다 : https://github.com/ReactiveX/RxSwift/issues/590), 그래서 직접 문제를 해결하려고했습니다.
내 첫 번째 방법은 바로 디 바운스 버퍼 구축 하였다
extension ObservableType {
func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
var valueBuffer: [E] = []
let observable = self.do(onNext: { (value) in
valueBuffer.append(value)
}, onError: { (error) in
valueBuffer = []
}, onCompleted: {
valueBuffer = []
}, onSubscribe: {
valueBuffer = []
}, onDispose: {
valueBuffer = []
}).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
let emitValues = valueBuffer
valueBuffer = []
return Observable<[E]>.just(emitValues)
}
return observable
}
}
가
내 두 번째 방법은 버퍼를 구축 하였다하는합니다 (RxJava 버전과 같은) 임의의 폐쇄 상태 :
extension ObservableType {
func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
var valueBuffer: [E] = []
return Observable.create { observer in
let selectorSubscription = selector.subscribe(onNext: { (value) in
let emitValues = valueBuffer
valueBuffer = []
observer.on(.next(emitValues))
}, onError: { (error) in
valueBuffer = []
observer.on(.error(error))
}, onCompleted: {
valueBuffer = []
observer.on(.completed)
}, onDisposed: {
valueBuffer = []
})
let subscription = self.subscribe(onNext: { (value) in
valueBuffer.append(value)
}, onError: { (error) in
observer.on(.error(error))
selectorSubscription.dispose()
}, onCompleted: {
observer.on(.completed)
selectorSubscription.dispose()
}, onDisposed: {
observer.on(.completed)
selectorSubscription.dispose()
})
return subscription
}
}
}
나는 두 연산자를 사용하면 작동하는 것처럼 보이며 onError, onDispose 및 onCompleted 이벤트의 다양한 조합을 처리하여 테스트됩니다.
하지만 적어도 누출이없는 허용 가능한 솔루션이라면 더 많은 경험이있는 사람들로부터 피드백을 듣고 싶습니다. 그리고 어떤 RX 계약을 위반하고 있다면. http://pastebin.com/1iAbUPf8 다음
나는 HTTP합니다 ([RxSwift 여유 채널] 체크 아웃 또한 [RxSwiftExt]에 PR을 제안한다 제안 (https://github.com/RxSwiftCommunity/RxSwiftExt)와 것 .com /). – solidcell
감사합니다. 느슨한 채널은 좋은 생각이며, 나는 PR을 만드는 것에 대해 생각할 것입니다. – michaelk