2012-03-06 4 views
0

하나의 주제를 통해 모든 다른 이벤트 스트림을 프록시해야합니다.하나의 주제를 사용하여 서로 다른 이벤트 스트림을 전파하십시오.

는이 코드를 내놓았다 :

var mySubject, 
    getObservable; 

getObservable = function (subject, eventName) { 
    return subject 
     .asObservable() 
     .filter(function (x) { 
      return x.EventName === eventName; 
     }) 
     .flatMap(function (x) { 
      if (x.Type === 'onNext') { 
       return Rx.Observable.return(x.Data); 
      } 

      if (x.Type === 'onError') { 
       return Rx.Observable.throw(x.Data); 
      } 

      return Rx.Observable.empty(); 
     }); 
}; 

mySubject = new Rx.Subject(); 

getObservable(mySubject, 'foo') 
    .subscribe(function(x){ 
     console.log('foo onNext ' + x); 
    }, function(x){ 
     console.log('foo onError ' + x); 
    }, function(){ 
     console.log('foo onComplete'); 
    }); 

getObservable(mySubject, 'bar') 
    .subscribe(function(x){ 
     console.log('bar onNext ' + x); 
    }, function(x){ 
     console.log('bar onError ' + x); 
    }, function(){ 
     console.log('bar onComplete'); 
    }); 

mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5}); 
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'}); 

mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5}); 
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'}); 

있어 출력 :

foo onNext 5 

bar onNext 5 
bar onError Error message 

예상 출력 : bar 이벤트의

foo onNext 5 
foo onCompleted 

bar onNext 5 
bar onError Error message 

, 매력처럼 작동 : onNext 전파됩니다. d 오류가 발생하자마자 onError 함수가 호출되고 이벤트 스트림이 완료됩니다. 그러나 onComplete에서 작동하지 않습니다.

완전한 알림이 발생할 때마다 Rx.Observable.empty()이 호출되지만 구독자 onComplete 처리기가 호출되지는 않습니다. 대신 자사의 전화는 onNext입니다.

답변

1

getObservable 함수는 subject을 통해 전송 된 eventName 이벤트를 구독하는 관찰 가능 목록을 반환합니다. .NET Observable.SelectMany에

var mySubject, 
 
    getObservable; 
 

 
getObservable = function (subject, eventName) { 
 
    return Rx.Observable.create(function (observer) { 
 
     subject 
 
      .asObservable() 
 
      .filter(function(x) { 
 
       return x.EventName === eventName; 
 
      }) 
 
      .map(function(x) { 
 
       if (x.Type === 'onNext') { 
 
        observer.onNext(x.Data); 
 
       } 
 

 
       if (x.Type === 'onError') { 
 
        observer.onError(x.Data); 
 
       } 
 
       
 
       if (x.Type === 'onCompleted') { 
 
        observer.onCompleted(); 
 
       } 
 
       
 
       return x; 
 
      }) 
 
      .subscribe(); 
 
    }); 
 
}; 
 

 
mySubject = new Rx.Subject(); 
 

 
getObservable(mySubject, 'foo') 
 
    .subscribe(function(x){ 
 
     console.log('SomethingHappened onNext ' + x); 
 
    }, function(x){ 
 
     console.log('SomethingHappened onError ' + x); 
 
    }, function(){ 
 
     console.log('SomethingHappened onComplete'); 
 
    }); 
 

 

 
getObservable(mySubject, 'bar') 
 
    .subscribe(function(x){ 
 
     console.log('DataUpdated onNext ' + x); 
 
    }, function(x){ 
 
     console.log('DataUpdated onError ' + x); 
 
    }, function(){ 
 
     console.log('DataUpdated onComplete'); 
 
    }); 
 

 
mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5}); 
 
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'}); 
 

 
mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5}); 
 
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js'></script>

+0

위대한 작품 마이크. 아직도, 나는 나의 처음 시도가 작동하지 않은 까 왜 보지 못한다. – Christoph

0

하나에 함께 스트림을 병합 Observable.Merge를 사용

let getObservable = function (subject, eventName) { 
    return Rx.Observable.create(function (observer) { 
     subject 
      .asObservable() 
      .filter(function(x) { 
       return x.EventName === eventName; 
      }) 
      .map(function(x) { 
       if (x.Type === 'onNext') { 
        observer.onNext(x.Data); 
       } 

       if (x.Type === 'onError') { 
        observer.onError(x.Data); 
       } 

       if (x.Type === 'onCompleted') { 
        observer.onCompleted(); 
       } 

       return x; 
      }) 
      .subscribe(); 
    }); 
}; 

원래 문제의 데이터를 사용하여 동작 예이며 복합 관찰 가능. IMHO Observable.Merge는 병합 된 관찰 대상 중 하나가 완료 될 때만 완료됩니다.

http://theburningmonk.com/2010/02/rx-framework-iobservable-merge/

그게 문제의 원인 일 수 있습니다.

+0

예, 될 수 있습니다. 그러나 당신의 솔루션도 효과가 있습니다. 내 업데이트 된 버전으로 dispose 함수를 반환하는 곳에서 dispose 함수로 빈 함수를 반환하기 때문에 업데이트 된 버전으로 리팩터링하는 것을 고려하십시오. 또한 select를 사용하지는 않지만 subscribe는 작업을 수행합니다. (부작용이 없어야 함) – Christoph

+0

예, 의미가 있습니다. SignalR과 RxJ를 결합하는 좋은 아이디어. 아픈 시간에 놀아 라. –

+0

관심이 있으시면 https://github.com/cburgdorf/SignalR.Reactive에서 확인하십시오. – Christoph