2017-12-18 22 views
4

경고 : 여기 RxJS newb. 여기 RxJS : 복수 중첩 관측치를 버퍼로 결합하는 방법

내 도전 :

  1. onUnlink$ 관찰이 방출이 ...
  2. 즉시 1 초 최대를 들어, onAdd$ 관찰에서 값을 캡처를 시작 (나는이 파티션 onAddBuffer$ 전화 할게) 때 . 데이터베이스 (A doc$ 관찰을 만들기는) 우리가 onAdd$ 값 중 하나
  3. 관찰 onAddBuffer$의 값 중 하나가 doc$ 값과 일치하는 경우에 대해 일치하는 데 사용할 모델을 가져올
  4. 쿼리,
  5. 을 방출하지 않는
  6. 관찰 onAddBuffer$의 값 중 어느 것도 doc$ 값과 일치하지 않습니다, 또는이 관찰 방출하지 onAddBuffer$ 경우, 이것은 나의 추측했다

doc$ 값을 방출하는 경우 :

// for starters, concatMap doesn't seem right -- I want a whole new stream 
const docsToRemove$ = onUnlink$.concatMap(unlinkValue => { 

    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })) 

    const onAddBuffer$ = onAdd$ 
    .buffer(doc$) // capture events while fetching from db -- not sure about this 
    .takeUntil(Rx.Observable.timer(1000)); 

    // if there is a match, emit nothing. otherwise wait 1 second and emit doc 
    return doc$.switchMap(doc => 
    Rx.Observable.race( 
     onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()), 
     Rx.Observable.timer(1000).mapTo(doc) 
    ) 
); 
}); 

docsToRemove$.subscribe(doc => { 
    // should only ever be invoked (with doc -- the doc$ value) 1 second 
    // after `onUnlink$` emits, when there are no matching `onAdd$` 
    // values within that 1 second window. 
}) 

항상 EmptyObservable을 내 보냅니다. 아마도 single은 일치가 없을 때 undefined을 방출하는 것처럼 보이기 때문에 일치하는 것이 없을 때 전혀 방출하지 않을 것으로 예상합니까? find도 마찬가지입니다.

singlefilter으로 변경하면 아무 것도 방출되지 않습니다.

참고 :이 파일 시스템 이벤트와 이름 변경 시나리오는 - add 이벤트가 unlink 이벤트 1 초 이내에 다음과 방출 된 파일이 일치하는 해시, 그것은 rename 때문에 아무것도 할 수없는 경우. 그렇지 않으면 사실 unlink이고 제거 할 데이터베이스 문서를 방출해야합니다.

onUnlink$.concatMap(unlinkValue => { 
    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share(); 
    const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$); 
    const onAddBuffer$ = onAdd$.buffer(bufferDuration$); 

    return Observable.forkJoin(onAddBuffer$, doc$) 
    .map(([buffer, docResponse]) => { /* whatever logic you need here */ }); 
}); 

single() 운영자가 조금 까다 롭다는 관찰 가능한 소스가를 완료 한 경우에만 후 술어의 기능과 일치하는 항목을 방출 (또는를 방출하기 때문에 :

+0

당신이 꽤 심한 경쟁 조건을 만들고있는 것처럼 들립니다. 시간 초과는 대개이 문제를 처리하는 좋은 방법은 아닙니다. 어떤 이유로 든 일이 오래 걸리면 데이터가 손실됩니다. –

+1

예 여기에는 경쟁 조건이있을 수 있습니다. 궁극적으로이 접근 방식을 무력화시킬 수 있습니다.rxjs를 배울 좋은 기회 인 것 같았습니다. – glortho

답변

3

이것은 당신이 할 수있는 방법을 내 생각이다 두 항목이 있거나 일치하는 항목이없는 경우 오류가 발생합니다.)

race()도 까다 롭습니다. 소스 Observables 중 하나가 완료되고 값을 내 보내지 않으면 race()이 완료되고 아무 것도 출력하지 않습니다.. 얼마 전이 사실을보고했으며 이것이 올바른 행동입니다 (https://github.com/ReactiveX/rxjs/issues/2641 참조).
이것이 코드에서 잘못 나온 것 같습니다.

또한 .mapTo(Rx.Observable.empty())은 각 값을 Observable의 인스턴스로 매핑합니다. 모든 값을 무시하려면 filter(() => false) 또는 ignoreElements() 연산자를 사용할 수 있습니다.