2017-11-17 13 views
2

enter image description hererxjs : 내 프런트 엔드에 대한 응답의 시리즈를 보낼 socket.io를 사용하고 Observables은

를 통해 응답을 주문하는 방법. 응답은 순차적 인 것으로 의도되었지만 socket.io에 의해 생성 된 연결에 따라 항상 올바른 순서 (https://github.com/josephg/ShareJS/issues/375)가 보장되지는 않습니다.

각 응답에 번호가있는 시퀀스 필드가 있다고 가정하면 (위 그림에서 숫자로 표시) 관찰 가능 목록은 이러한 응답을 순서대로 방출해야합니다.

응답이 순서대로 수신되지 않고 아무런 응답도받지 않고 일정 시간 (n)이 지나면 내 관찰 가능 목록에서 오류를 내고 프런트 엔드에 신호를 보내 연결을 재설정하고 싶습니다.

+1

일련 번호는 순차적 초기 시퀀스 번호가 알려져있다? –

+0

@TeddySterne - 예, 그림 앞에 0이 전달되었다고 가정하므로 프런트 엔드는 1을 기다리고 있음을 알고있었습니다. – ZackDeRose

답변

1

정말 좋은 문제입니다. 가장 중요한 부분이 주석 처리 된 스 니펫 아래

// mock ordered values 
 
const mockMessages = Rx.Observable.fromEvent(document.querySelector('#emit'), 'click') 
 
    .map((e, index) => ({ 
 
    index, 
 
    timestamp: e.timeStamp 
 
    })) 
 
    .delayWhen(() => Rx.Observable.timer(Math.random() * 2000)) // distort order 
 

 
// there is a lot of mutability in `keepOrder`, but all of it 
 
// is sealed and does not leak to outside environment 
 
const keepOrder = timeoutMs => stream => 
 
    Rx.Observable.defer(() => // need defer to support retries on error 
 
    stream.scan((acc, v) => { 
 
     acc.buffer.push(v) 
 
     acc.buffer.sort((v1, v2) => v1.index - v2.index) 
 
     return acc 
 
    }, { 
 
     lastEmitted: -1, 
 
     buffer: [] 
 
    }) 
 
    .mergeMap(info => { 
 
     const emission = [] 
 
     while (info.buffer.length && info.lastEmitted + 1 === info.buffer[0].index) { 
 
     emission.push(info.buffer.shift()) 
 
     info.lastEmitted += 1 
 
     } 
 
     return Rx.Observable.of(emission) 
 
    }) 
 
    .switchMap(emissions => { 
 
     if (!emissions.length) { // this condition indicates out of order 
 
     return Rx.Observable.timer(timeoutMs) 
 
      .mergeMapTo(Rx.Observable 
 
      .throw(new Error('ORDER_TIMEOUT'))) 
 
     } else { 
 
     return Rx.Observable.from(emissions) 
 
     } 
 
    }) 
 
) 
 

 

 
mockMessages 
 
    .do(x => console.log('mocked', x.index)) 
 
    .let(keepOrder(1000)) // decrease timeoutMs to increase error probablity 
 
    .do(x => console.log('ORDERED', x.index)) 
 
    .retryWhen(es => es 
 
    .do(e => console.warn('ERROR', e))) 
 
    .subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script> 
 

 
<button id="emit">EMIT</button>