2017-10-09 8 views
2

나는 대답은 forkJoin/Promises.all 좋겠어요 ...하지만 ... 나와 함께 베어하시기 바랍니다 조금 더피 감시 (주제)

그래서 ... 저는 약속의 원천을 가지고 있습니다. 그것은 무작위 순서로 도착할 수 있습니다. 그리고 "지금까지 도착한 모든 약속이 끝나면 알려주세요"라고 말할 어떤 방법이 필요합니다.

Promise 기반 솔루션에서 나는 처음에 Promise.all을 사용한다고 생각했지만 약속이 "도착 중"이면서 나머지는 완료되지 않은 상태 일 수 있습니다. 흥미롭게도 "반복 가능한 Promise.all"에 대한 깔끔한 해결 방법이 있습니다. https://stackoverflow.com/a/37819138/239168

저는이 방법을 시도하고 있습니다. 조금이라도 문서를 읽은 후에는 forkJoinPromise.all에 해당한다고 생각합니다. 그러나 같은 문제가 있어도 forkJoin 또는 Promise.all을 안전하게 호출 할 수있는 시점은 없습니다. 다른 하나는 아직 보류 중으로 추가 될 수 있으므로 항상 하나가되어야합니다. 지금까지는 아무 의미가 없기 때문에 몇 가지 지침을 요청할 것입니다.

설정

내가이 주제를 가지고, 내가 때의 약속을 모두 알고 싶어 (이 바보라면, 나는 ... 수신에 새로운 해요 당신의 웃음을 보유) ... 또한 항상 ... 언제든지 새로운 추가 약속을 얻을

private promiseSource = new Subject<Promise<any>>(); 
promises$ = this.promiseSource.asObservable(); 

새로운 약속 "도착"때마다, 난 그냥 주제에 완벽한 추가 해요 수 있습니다

this.promiseSource.next(somePromise); 

내가 마술처럼 생겼 으면하는 것은 - 완결 된 약속 만 지닐 때마다 주제를 "완전"하게하십시오.

오버랩이, 예를 들어이없는 경우

promises$.magicFlatMapForkJoinConcatMapTrickery().subscribe({ 
    next: x => ..., 
    error: err => ..., 
    complete:() => { 
    console.log('all promises we got so far are done'); 
    // nice to have, I want this to keep "listening" for new promises 
    promiseSource.youAreNotREALYCompletePleaseReset(); 
    } 
}); 

또는 다른 말로

, 나는 우리가 내용을 살펴 경우, 우리는 비동기 작업을 중복 볼 수 있습니다, 비동기 행동의 관찰을 가지고, 내가 알고 싶은

|<-async action 1->| |<-async action 3->| 
      |<-async action 2->|      |<-async action 4->| 

              /\  /\ 
             find this gap 

예를 들어 http 호출 인 경우 기본적으로 묻습니다. 열린 http 호출이없는 경우 알려주세요.

TL; DR

이 약속에게 RxJS 세계를 기반으로 대답을 구현하는 방법을

...

https://stackoverflow.com/a/37819138/239168

나는 이것을 기반으로 수행하는 매우 간단한 방법을 생각할 수

답변

1

이전 답변. fromPromise을 사용하여 Subject<Promise<any>>Subject<Observable<any>>으로 바꾼 다음 this 대답에 설명 된 active 함수를 사용하여 활성 관찰 가능 개체의 관찰 가능 상태로 줄일 수 있습니다. 일단 당신이 그것을 얻으면, 당신은 "활성 스트림의 배열이 비어있을 때"와 같은 간단한 쿼리로 할 수있는 쿼리를 사용할 수 있습니다.: 당신이 첫 번째 시간을 원하는 경우, .take(1) 또는를 배치 있도록

active(yourSubjectOfObservables).filter(x => x.length === 0).subscribe(() => { 
    // here we are, all complete 
}); 

이 제로에 때마다 활성 스트림 전환의 수를 트리거합니다. 필터와 구독간에 first

아마도 가장 좋은 해결책은 아니지만 개념적으로 간단합니다.

+0

쿨, 나는 여전히 절벽을 등반하고 있습니다 ... 그것을 시도를 줄 것이다 :) –

2

귀하의 질문을 올바르게 해석하는 경우, 보류중인 약속이 있는지 여부를 나타내는 신호에만 관심이 있습니다.

mergescan을 사용하면 대기중인 약속 수를 방출하는 관측 가능 항목을 만들 수 있으며, 그로부터 원하는 신호를 만들 수 있어야합니다.

기본적으로 피험자가 약속을 할 때마다 보류중인 약속 수를 늘려야합니다. 그리고 그 약속들 중 하나가 해결 될 때마다 그 카운트는 감소 될 수 있습니다.

const promises = new Rx.Subject(); 
 

 
const pendingCount = Rx.Observable 
 
    .merge(
 
    promises.mapTo(1), 
 
    promises.mergeMap(p => Rx.Observable.from(p).mapTo(-1)) 
 
) 
 
    .scan((acc, value) => acc + value, 0) 
 
    .do(count => console.log(`${count} pending promise(s)`)); 
 

 
const doneSignal = pendingCount 
 
    .filter(count => count === 0) 
 
    .mapTo("done"); 
 

 
doneSignal.subscribe(signal => console.log(signal)); 
 

 
const timeoutPromise = (delay) => new Promise(resolve => setTimeout(resolve, delay)); 
 

 
promises.next(timeoutPromise(200)); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 100); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 300); 
 
setTimeout(() => promises.next(timeoutPromise(200)), 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>