2016-10-08 5 views
3

나는 비교적 새로운 RxSwift에 익숙하다. 그러나 나는 그것을 내 프로젝트에서 더 많이 사용하기를 고대하고있다. 나는 방금 쓴 연산자에 대한 약간의 피드백을 듣고 싶다.RxSwift로 디 바운스 된 버퍼를 구현하는 것이 맞습니까?

누락 된 기능은 디버깅 된 버퍼입니다. debounce 연산자와 똑같은 동작을하는 버퍼이지만 최신 값만 방출하는 대신 마지막 방출 이후 수집 된 모든 값을 방출해야합니다. 버퍼 연산자의 버전이 존재하지 않지만 (RxSwift에서

// From: https://github.com/ReactiveX/RxJava/wiki/Backpressure 
// 
// we have to multicast the original bursty Observable so we can use it 
// both as our source and as the source for our buffer closing selector: 
Observable<Integer> burstyMulticast = bursty.publish().refCount(); 
// burstyDebounced will be our buffer closing selector: 
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS); 
// and this, finally, is the Observable of buffers we're interested in: 
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced); 

I을 :

RxJava에서 이것은 "폐쇄 선택기"등의 다른 관찰로 버퍼를 이용하여 용이하게 달성 이 문제는 관련이 있다고 생각합니다 : https://github.com/ReactiveX/RxSwift/issues/590), 그래서 직접 문제를 해결하려고했습니다.


내 첫 번째 방법은 바로 디 바운스 버퍼 구축 하였다

extension ObservableType { 
    func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> { 
     var valueBuffer: [E] = [] 

     let observable = self.do(onNext: { (value) in 
      valueBuffer.append(value) 
     }, onError: { (error) in 
      valueBuffer = [] 
     }, onCompleted: { 
      valueBuffer = [] 
     }, onSubscribe: { 
      valueBuffer = [] 
     }, onDispose: { 
      valueBuffer = [] 
     }).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in 
      let emitValues = valueBuffer 
      valueBuffer = [] 
      return Observable<[E]>.just(emitValues) 
     } 

     return observable 
    } 
} 

내 두 번째 방법은 버퍼를 구축 하였다하는합니다 (RxJava 버전과 같은) 임의의 폐쇄 상태 :

extension ObservableType { 
    func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> { 
     var valueBuffer: [E] = [] 

     return Observable.create { observer in 
      let selectorSubscription = selector.subscribe(onNext: { (value) in 
       let emitValues = valueBuffer 
       valueBuffer = [] 
       observer.on(.next(emitValues)) 
      }, onError: { (error) in 
       valueBuffer = [] 
       observer.on(.error(error)) 
      }, onCompleted: { 
       valueBuffer = [] 
       observer.on(.completed) 
      }, onDisposed: { 
       valueBuffer = [] 
      }) 

      let subscription = self.subscribe(onNext: { (value) in 
       valueBuffer.append(value) 
      }, onError: { (error) in 
       observer.on(.error(error)) 
       selectorSubscription.dispose() 
      }, onCompleted: { 
       observer.on(.completed) 
       selectorSubscription.dispose() 
      }, onDisposed: { 
       observer.on(.completed) 
       selectorSubscription.dispose() 
      }) 
      return subscription 
     } 
    } 
} 

나는 두 연산자를 사용하면 작동하는 것처럼 보이며 onError, onDispose 및 onCompleted 이벤트의 다양한 조합을 처리하여 테스트됩니다.

하지만 적어도 누출이없는 허용 가능한 솔루션이라면 더 많은 경험이있는 사람들로부터 피드백을 듣고 싶습니다. 그리고 어떤 RX 계약을 위반하고 있다면. http://pastebin.com/1iAbUPf8 다음

+1

나는 HTTP합니다 ([RxSwift 여유 채널] 체크 아웃 또한 [RxSwiftExt]에 PR을 제안한다 제안 (https://github.com/RxSwiftCommunity/RxSwiftExt)와 것 .com /). – solidcell

+0

감사합니다. 느슨한 채널은 좋은 생각이며, 나는 PR을 만드는 것에 대해 생각할 것입니다. – michaelk

답변

1

buffer(bufferOpenings, bufferClosingSelector)에 대한 내 꺼야 :

는 또한 몇 가지 테스트 코드와 pasterbin를 만들었습니다. 추가 검토가 필요할 수 있습니다. //rxswift-slack.herokuapp :

extension ObservableType { 

    func buffer<R>(bufferOpenings: Observable<R>, bufferClosingSelector: (R)->Observable<R>) -> Observable<[E]> { 
     var valueBuffer: [E]? = nil 

     let operatorObservable = Observable<[E]>.create({ observer in 
      let subject = PublishSubject<[E]>() 

      let closingsSub = bufferOpenings 
       .doOnNext({ _ in 
        valueBuffer = [] 
       }) 
       .flatMap({ opening in 
        return bufferClosingSelector(opening) 
       }) 
       .subscribeNext({ _ in 
        if let vb = valueBuffer { 
         subject.onNext(vb) 
        } 
        valueBuffer = nil 
       } 
      ) 

      let bufferSub = self.subscribe(
       onNext: { value in 
        valueBuffer?.append(value) 
       }, 
       onError: { error in 
        subject.onError(error) 
       }, 
       onCompleted: { 
        subject.onCompleted() 
       }, 
       onDisposed: { 
       } 
      ) 

      let subjectSub = subject.subscribe(
       onNext: { (value) in 
        observer.onNext(value) 
       }, 
       onError: { (error) in 
        observer.onError(error) 
       }, 
       onCompleted: { 
        observer.onCompleted() 
       }, 
       onDisposed: { 
       } 
      ) 

      let combinedDisposable = CompositeDisposable() 

      combinedDisposable.addDisposable(closingsSub) 
      combinedDisposable.addDisposable(bufferSub) 
      combinedDisposable.addDisposable(subjectSub) 

      return combinedDisposable 

     }) 

     return operatorObservable 
    } 

}