4

사용 RxJS 5.0.0-rc.1에 subcribe에서 반환 값을 가져, 나는 yield.next()를 사용하여 데이터를 교환하여 how generators/iterators work 유사한 방법으로 내 ObserverObservable를 전달하기 위해 노력하고있어. 의도는 입니다. .subscribe에 대한 호출이 인 것을 보류하고 그에 따라 내 관찰 가능 스트림에서 다음 값을 수정/업데이트합니다.관찰 가능한

이것이 가능한지 확실하지 않습니다. 그러나, 당신이 캐치 예외는 .subscribe 콜백에 던져 발견. 다음 조각은 "Boom!"를 출력 :

var source = Observable.create((observer) => { 
    try { 
    observer.next(42); 
    } catch (e) { 
    // This will catch the Error 
    // thrown on the subscriber 
    console.log(e.message); 
    } 
    observer.complete(); 
}); 

source.subscribe(() => { 
    throw new Error('Boom!'); 
}); 

그래서, 만약에 대신 던지는 것은, 가입자가 값을 반환? Observable에서 검색 할 수있는 방법이 있습니까? 아마도 나는 이것을 잘못된 방향으로 접근하고있을 것입니다. 그렇다면이 시나리오에서 일을하는 "반응적인"방법은 무엇입니까?

감사합니다.


편집 내가 생각 해낸 가능한 방법은 스트림의 모든 항목에 콜백 기능을 제공하는 것입니다

하나. 예 :

var source = Observable.create((observer) => { 
    // This will print "{ success: true }" 
    observer.next({ value: 42, reply: console.log }); 
    observer.complete(); 
}); 

source.subscribe(({ value, reply }) => { 
    console.log('Got', value); 
    return reply({ success: true }); 
}); 

다른 의견은 있습니까? 내 원래의 질문에 내가 달성하기 위해 노력하고 있었는지에 혼란을 가져 때문에


편집 2

, 내 실제 시나리오를 설명 할 것이다. 큐를 통해 메시지를 관리하기위한 모듈의 API를 작성하고 있습니다 (단순화 된 메모리, AMQP-RPC 메커니즘과 비슷합니다). RxJS가 적합 할지라도.

예상대로 작동합니다 : Publisher은 메시지를 대기열로 푸시합니다.이 큐는 Consumer으로 배달됩니다. 잠시 말하면 ConsumerPublisher에 회신 할 수 있습니다. 관심있는 응답을 수신 할 수 있습니다. 이상적인 시나리오에서

는 API는 다음과 같이 보일 것입니다 : 예 42

Consumer().consume('some.pattern') 
    .subscribe(function(msg) { 
    // Do something with `msg` 
    console.log(msg.foo); 
    return { ok: true }; 
    }); 

Publisher().publish('some.pattern', { foo: 42 }) 
// (optional) `.subscribe()` to get reply from Consumer 

인쇄 할 것이다.

Publisher에 응답하는 논리는 Consumer 기능 내에 있습니다. 그러나 실제 응답은 .subscribe() 콜백의 입니다. 어떤 점이 저의 원래 질문으로 이어집니다 : 스트림의 생성자로부터 반환 된 값을 가져 오려면 어떻게해야합니까?

/** * Returns an async handler that gets invoked every time * a new message matching the pattern of this consumer * arrives. */ function waitOnMessage(observer) { return function(msg) { observer.next(msg); // Conceptually, I'd like the returned // object from `.subscribe()` to be available // in this scope, somehow. // That would allow me to go like: // `sendToQueue(pubQueue, response);` } } return Observable.create((observer) => { queue.consume(waitOnMessage(observer)); }); 

는 더 이상 의미가 있습니까

: 같은 Consumer#consume()

생각해?

답변

2

실제로 생성자와 관찰 가능 항목간에 유사점이 있습니다. here에서 볼 수 있듯이 observables (값의 비동기 시퀀스)는 iterables의 비동기 버전 (동기 값 시퀀스)입니다.

이제 발전기는 Iterable을 반환하는 함수입니다. 그러나 Rxjs Observable은 생성자 - 즉, subscribe을 호출하여 실행/시작하는 생성자와 Observer 개체를 전달하여 관찰 된 값의 생성 된 비동기 시퀀스를 모두 포함합니다. subscribe 호출은 값 수신을 중지 할 수있는 Disposable을 반환합니다 (연결 끊기). 따라서 생성자와 관찰 가능 객체는 이중 개념이지만 사용하는 API는 다릅니다.

기본적으로 rxjs observable API를 사용하여 양방향 통신을 수행 할 수 없습니다. 아마도 과목을 통해 뒷 채널을 구성하여이를 수행 할 수 있습니다 (주기를 시작하기 위해 초기 값을 가져야 함).

function twoWayObsFactory (yield, initialValue) { 
    var backChannel = Rx.BehaviorSubject(initialValue); 
    var next = backChannel.next.bind(backChannel); 
    return { 
    subscribe : function (observer) { 
     var disposable = backChannel.concatMap(yield) 
     .subscribe(function(x) { 
      observer(next, x); 
     }); 
     return { 
     dispose : function(){disposable.dispose(); backChannel.dispose();} 
     } 
    } 
    } 
} 

// Note that the observer is now taking an additional parameter in its signature 
// for instance 
// observer = function (next, yieldedValue) { 
//    doSomething(yieldedValue); 
//    next(anotherValue); 
//   } 
// Note also that `next` is synchronous, as such you should avoir sequences 
// of back-and-forth communication that is too long. If your `yield` function 
// would be synchronous, you might run into stack overflow errors. 
// All the same, the `next` function call should be the last line, so order of 
// execution in your program is the same independently of the synchronicity of 
// the `yield` function 

그렇지 않으면, 당신이 설명하는 행동이 비동기 발전기의 것 같다 :

var backChannel = Rx.Subject(); 
backChannel.startWith(initialValue).concatMap(generateValue) 
    .subscribe(function observer(value){ 
    // Do whatever 
    // pass a value through the backChannel 
    backChannel.next(someValue) 
}) 
// generateValue is a function which takes a value from the back channel 
// and returns a promise with the next value to be consumed by the observer. 

당신은 함께 그 포장 고려할 수 있습니다. 나는 그런 것을 사용하지는 않았지만 이것이 자바 스크립트의 미래 버전에 대한 제안이기 때문에 나는 이 이미 Babel (cf. https://github.com/tc39/proposal-async-iteration)과 함께 시도해 볼 수 있다고 생각한다.

편집 :

당신은 루프 백 메커니즘을 찾고 있습니다 (이하 범용 접근하지만 당신이 원하는 것은 간단하다 경우 수 아주 잘, 사용 사례에 맞는) 경우, expand 연산자 도움이 될 수 있습니다. 동작을 이해하기 위해서는 구체적인 상황에서 사용의 예 SO에 doc, 다음과 같은 답변을 확인하십시오 :

을 기본적으로 expand을 사용하면 다음을 할 수 있습니다 모두 다운 스트림 값을 내 보내며 동시에 프로듀서에서 해당 값을 다시 피드합니다.

+0

나는 당신의 생각을 좋아합니다. 그 사용법의 예를 보여 주시겠습니까? 아마도,'twoWayObsFactory'를 사용하여 원래의 질문에 간단한 샘플 코드를 다시 구현할 수 있을까요? –

+0

나는 방금'expand' 연산자 (Rxjs v4)를 점검 했습니까? 그것은 당신이 피드백 루프를 가질 수 있습니다. 그런데 간단한 코드는 당신이 묘사하는 문제의 좋은 예가 아닙니다. 당신은 내가 보는 것에 대한 그 예에서 관측 가능한 값을 다시 보내지 않을 것입니다. – user3743222

+0

값을 보내고 있습니다. 사실 전체 개체입니다 :'{success : true}'. 관측 가능은 인쇄를 제외하고는 아무 것도하지 않습니다. 그러나 원하는대로 응답을 변경하면 어떤 방식 으로든 응답을 조작 할 수 있습니다. 나는 ** EDIT ** 밑에 나의 두번째 예를 이야기하고있다. –