2017-10-21 3 views
0

나는 (시간이 걸리는) 계산, 그런 일이 있습니다경과 타이머와 함께 RxJS를 사용하여 처리하는 동안 관찰 가능한의 항목을 생략하는 방법

async calculation(input: MyInputType): Promise<MyOutputType> { 
    // fetch some infos from an HTTP endpoint 
    // calculate result by combining received infos with input 
    return result; // Promise which resolves after all calculation is complete 
} 
계산에 대한 입력을 통해 다른 구성 요소에서 온다

관찰 할 수있는 것 (BehaviorSubject)은 때때로 처리 할 수있는 것보다 빠르게 항목을 방출합니다. 따라서 출력은 http 엔드 포인트에서 관찰 가능하고 가져온 정보에 따라 달라지며, 때때로 변경 될 수도 있습니다.

중요 : 마지막 계산 결과 내 응용 프로그램에 대한 관련 언제든지 변수에 사용할 수 있어야합니다.
내 목표는 다음과 같은 동작과 설정에 관찰 체인을 RxJS하고 반응 연산자를 사용하는 것입니다 : 계산은 무시해야 처리하는 동안 (서비스 거부를 방지하기 위해 backpressuring) 방출 소스 BehaviorSubject에서

  • 항목 그러나 ...
  • 현재 계산이 완료되거나 시간 초과가 발생하면 새 계산 항목이 있으면 가장 최근 입력 항목에 대해 새 계산을 직접 시작해야합니다.
  • 5 분 이상 계산이 시작되지 않은 경우 가장 최근의 입력 항목으로 새 계산을 다시 시작해야합니다. (만료되는 HTTP 정보를 업데이트하는 것이 중요합니다.)

이 문제를 어떻게 해결할 수 있습니까?
(RxJS 5, TypeScript, Node.js를 사용하고 있지만 다른 언어의 반응 형 솔루션도 환영합니다.)

+0

나는이 질문이 당신이 성취하려고 노력하는 것과 아주 흡사하다고 생각한다. https://stackoverflow.com/q/46785128/482868 –

+1

계산에 5 분 이상이 걸리면 어떻게됩니까?새 계산이 아직 시작 되었습니까? 그렇다면 이전 계산은 어떻게됩니까? – concat

+0

좋은 질문이지만 두 번째 글 머리 기호에서 언급 한 시간 제한으로 인해이 문제가 발생하지 않아야합니다. 이 계산 제한 시간 초과는 물론 5 분 미만입니다. 따라서 결과는이 상황에서 버려 질 수 있습니다. – Niehno

답변

0

내가 서있는 곳에서 두 가지 질문이 있습니다. 첫 번째 방법은 5 분이 경과하면 이미 계산 된 항목에 대해 계산을 반복하는 방법입니다. 마지막 값의 복제 된 반복으로> 5 분을 채우는 것으로 시작합시다. 복제는 나중에 중요 할 것입니다.

// inputs$: Observable<MyInputType> 
const spacefilled_inputs$ = inputs$.switchMap(input => Observable.interval(5000).map(() => Object.assign({}, input))) 
            .merge(input$); 

번째 문제는 까다 : 모두 (있는 경우) 이전 계산이 새로운 입력 spacefilled_inputs$로부터 방출 될 때, 및 완료되면 calculation 방법을 트리거? 공정한 경고 : RxJS는 이것을 매우 자연스럽게하지 않습니다.

const promise_subject = new Subject<Promise<MyOutputType>>(); 
const results$ = promise_subject.asObservable() 
           .flatMap(promise => Observable.fromPromise(promise)); 
Observable.combineLatest(spacefilled_inputs$, results$) 
      .pairwise() 
      .scan(([inputs_changed, results_changed], [prev_pair, next_pair]) => { 
      // assumes that both MyInputType and MyOutputType are non-primitive objects 
      inputs_changed = inputs_changed || prev_pair[0] !== next_pair[0]; 
      results_changed = results_changed || prev_pair[1] !== next_pair[1]; 

      if(inputs_changed && results_changed) { 
       promise_subject.next(calculation(next_pair[1])); 
       return [false, false]; 
      } 
      return [inputs_changed, results_changed]; 
      }, [true, true]); 

제일, 우리는 주제를 통해 계산 결과를 스트리밍 할 수 있습니다 그들이 더 계산을 신호 할 수 있도록 : 후 내가 먼저 몇 가지 코드를 내려 놓고 설명 할 것이다. 여기서 중요한 것은 pairwise입니다. 이전 및 들어오는 요소의 프레임을 방출하므로 비교할 수 있습니다. scan의 본문은 입력과 결과가 모두 변경되면 상태를 비교하고 유지합니다.

변경 사항 확인시 오브젝트 참조에 대해 완전 항등을 사용합니다. MyOutputType 또는 MyInputType이 프리미티브 인 경우이 값은 중복 된 값을 잘못 표시합니다. 입력 및 결과 스트림에서 카운터와 비슷한 작업을 수행해야합니다.

+0

답변 해 주셔서 감사합니다. 첫 번째 부분 : 나는 솔루션을 아주 좋아합니다. 왜 'Object.assign'이 필요한지 궁금합니다. (내 입력 객체는 꽤 클 수 있습니다.) mapTo를 호출하는 것으로 충분하지 않습니까? 두 번째 부분으로 : 나는 아직 귀하의 코드를 이해하지 못했다는 것을 인정해야합니다. (코드 100 %를 입력하십시오.) 입력란에 $ .switchMap (input => Observable.interval (5000) . 해결책은 나에게 비교적 복잡해 보인다. 코드를 더 잘 이해하면 다시보고하겠습니다. 2 부에 대한보다 쉬운 솔루션을 환영합니다. – Niehno

+0

@Niehno 복제가 없다면,'scan'이 변경을 감지하면 5 분마다 계산을 다시 시도하지 않을 것입니다. 왜냐하면 그것은 동일한 객체를 얻게 될 것이기 때문입니다. 동의합니다. 가장 효과적인 솔루션이 아니므로 불행히도 두 번째 스트림을 추가로 복잡하게 만들지 만 두 스트림 모두에 카운터를 연결할 수 있습니다. – concat