사용 RxJS 5.0.0-rc.1
에 subcribe에서 반환 값을 가져, 나는 yield
및 .next()
를 사용하여 데이터를 교환하여 how generators/iterators work 유사한 방법으로 내 Observer
및 Observable
를 전달하기 위해 노력하고있어. 의도는 입니다. .subscribe
에 대한 호출이 인 것을 보류하고 그에 따라 내 관찰 가능 스트림에서 다음 값을 수정/업데이트합니다.관찰 가능한
이것이 가능한지 확실하지 않습니다. 그러나, 당신이 수 캐치 예외는 .subscribe
콜백에 던져 발견. 다음 조각은 "Boom!"
를 출력 :
var source = Observable.create((observer) => {
try {
observer.next(42);
} catch (e) {
// This will catch the Error
// thrown on the subscriber
console.log(e.message);
}
observer.complete();
});
source.subscribe(() => {
throw new Error('Boom!');
});
그래서, 만약에 대신 던지는 것은, 가입자가 값을 반환? Observable
에서 검색 할 수있는 방법이 있습니까? 아마도 나는 이것을 잘못된 방향으로 접근하고있을 것입니다. 그렇다면이 시나리오에서 일을하는 "반응적인"방법은 무엇입니까?
감사합니다.
편집 내가 생각 해낸 가능한 방법은 스트림의 모든 항목에 콜백 기능을 제공하는 것입니다
하나. 예 :
var source = Observable.create((observer) => {
// This will print "{ success: true }"
observer.next({ value: 42, reply: console.log });
observer.complete();
});
source.subscribe(({ value, reply }) => {
console.log('Got', value);
return reply({ success: true });
});
다른 의견은 있습니까? 내 원래의 질문에 내가 달성하기 위해 노력하고 있었는지에 혼란을 가져 때문에
편집 2
, 내 실제 시나리오를 설명 할 것이다. 큐를 통해 메시지를 관리하기위한 모듈의 API를 작성하고 있습니다 (단순화 된 메모리, AMQP-RPC 메커니즘과 비슷합니다). RxJS가 적합 할지라도.
예상대로 작동합니다 : Publisher
은 메시지를 대기열로 푸시합니다.이 큐는 Consumer
으로 배달됩니다. 잠시 말하면 Consumer
은 Publisher
에 회신 할 수 있습니다. 관심있는 응답을 수신 할 수 있습니다. 이상적인 시나리오에서
42
을
Consumer().consume('some.pattern')
.subscribe(function(msg) {
// Do something with `msg`
console.log(msg.foo);
return { ok: true };
});
Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer
인쇄 할 것이다.
Publisher
에 응답하는 논리는 Consumer
기능 내에 있습니다. 그러나 실제 응답은 .subscribe()
콜백의 입니다. 어떤 점이 저의 원래 질문으로 이어집니다 : 스트림의 생성자로부터 반환 된 값을 가져 오려면 어떻게해야합니까?
/** * Returns an async handler that gets invoked every time * a new message matching the pattern of this consumer * arrives. */ function waitOnMessage(observer) { return function(msg) { observer.next(msg); // Conceptually, I'd like the returned // object from `.subscribe()` to be available // in this scope, somehow. // That would allow me to go like: // `sendToQueue(pubQueue, response);` } } return Observable.create((observer) => { queue.consume(waitOnMessage(observer)); });
는 더 이상 의미가 있습니까
: 같은Consumer#consume()
의 생각해?
나는 당신의 생각을 좋아합니다. 그 사용법의 예를 보여 주시겠습니까? 아마도,'twoWayObsFactory'를 사용하여 원래의 질문에 간단한 샘플 코드를 다시 구현할 수 있을까요? –
나는 방금'expand' 연산자 (Rxjs v4)를 점검 했습니까? 그것은 당신이 피드백 루프를 가질 수 있습니다. 그런데 간단한 코드는 당신이 묘사하는 문제의 좋은 예가 아닙니다. 당신은 내가 보는 것에 대한 그 예에서 관측 가능한 값을 다시 보내지 않을 것입니다. – user3743222
값을 보내고 있습니다. 사실 전체 개체입니다 :'{success : true}'. 관측 가능은 인쇄를 제외하고는 아무 것도하지 않습니다. 그러나 원하는대로 응답을 변경하면 어떤 방식 으로든 응답을 조작 할 수 있습니다. 나는 ** EDIT ** 밑에 나의 두번째 예를 이야기하고있다. –