2017-12-13 25 views
0

다음과 같은 관찰 가능 항목을 만드는 데 문제가 있습니다.
미리 정의 된 값 배열을 받길 원합니다.
다른 것들로 필터링하고 개별 관찰 가능 항목으로 작업 할 수 있습니다. 는 이러한 필터링 관찰 가능한 병합하는 데 시간이 올 때
그리고는, 나는 원래여러 번 관찰 할 수있는 ReactiveX 필터링 및 병합

//Not sure the share is necessary, just thought it would tie it all together 
const input$ = Observable.from([0,1,0,1]).share(); 
const ones$ = input$.filter(n => n == 1); 
const zeroes$ = input$.filter(n => n == 0); 

const zeroesChanged$ = zeroes$.mapTo(2); 
const onesChanged$ = ones$.mapTo(3); 
const allValues$ = Observable.merge(onesChanged$,zeroesChanged$); 

allValues$.subscribe(n => console.log(n)); 
//Outputs 3,3,2,2 
//Expected output 3,2,3,2 

편집에서 순서를 유지하려면 : 내가 내 질문에 충분한 특정되지 않았습니다 죄송합니다. 부작용을 드라이버로 구분하는 cycleJS라는 라이브러리를 사용하고 있습니다. 그래서 내가 내주기에서 뭐하는 거지 것은 내가 그것을 위해 테스트를 작성하고 싶어 할 때이

export function socketCycle({ SOCKETIO }) { 
    const serverConnect$ = SOCKETIO.get('connect').map(serverDidConnect); 
    const serverDisconnect$ = SOCKETIO.get('disconnect').map(serverDidDisconnect); 
    const serverFailedToConnect$ = SOCKETIO.get('connect_failed').map(serverFailedToConnect); 
    return { ACTION: Observable.merge(serverConnect$, serverDisconnect$, serverFailedToConnect$) }; 
} 

지금 내 문제가 발생합니다. 나는 잘못된 문제 (농담을 사용)에서 일한 다음과 함께 시도했다.

const inputConnect$ = Observable.from(['connect', 'disconnect', 'connect', 'disconnect']).share(); 
const expectedOutput$ = Observable.from([ 
    serverDidConnect(), 
    serverDidDisconnect(), 
    serverDidConnect(), 
    serverDidDisconnect(), 
]); 
const socketIOMock = { 
    get: (evt) => { 
    if (evt === 'connect') { 
     return inputConnect$.filter(s => s === 'connect'); 
    } else if (evt === 'disconnect') { 
     return inputConnect$.filter(s => s === 'disconnect'); 
    } 
    return Observable.empty(); 
    }, 
}; 
const { ACTION } = socketCycle({ SOCKETIO: socketIOMock }); 
Observable.zip(ACTION, expectedOutput$).subscribe(
    ([output, expectedOutput]) => { expect(output).toEqual(expectedOutput); }, 
    (error) => { expect(true).toBe(false) }, 
() => { done(); }, 
); 

어쩌면 내가 테스트 할 수있는 또 다른 방법이 있을까? 코드 아래

+0

'input $ .map (n => n + 2)'이며 예상 한 결과를 자세히 살펴야합니다. – cartant

+2

각 Observable에서의 "작업"이 한 번에 하나의 값만을 고려한다면, @cartant는 제안한 것처럼 이것을 필터링, 작업, 병합으로 보지 말고 바로 매핑하는 것으로보아야합니다. 가장 일반적인 경우, 매핑 함수는 if-else를 사용하여 값을 다른 동작으로 지정합니다. Observables가 반환되면 [concatMap] (https://www.learnrxjs.io/operators/transformation/concatmap.html)을 통해 주문을 보존 할 수 있습니다. – concat

+0

실제로 맵을 분리하고 소스와 리턴 싱크를 취하는 다른 함수를 병합했습니다. 입력이 비동기 적으로 발생하면 잘 작동하지만 테스트를 시도 할 때 미리 입력이 지정되어있는 곳에서 주문 보존 ... – user1898027

답변

1

는 다른 딸의 요소 사이의 타이밍을 보장 스트림 실제로 배열을 조작 할 rxjs를 사용할 필요 없습니다 의해 파괴됨. 특히, connect 이벤트가 항상 이벤트 소스에서 disconnect 이벤트 이전에 오더라도, connect Observable의 이벤트가 항상 해당 이벤트 항목 인 disconnect Observable보다 먼저 나오는 것은 아닙니다. 정상적인 시간대에서는이 레이스 컨디션은 매우 드물 긴하지만 위험 할 수 있습니다.이 테스트는 최악의 경우를 보여줍니다.

표시된 좋은 기능은 이벤트와 처리기의 결과 사이의 매퍼 (mapper)에 불과하다는 것입니다.

const event_handlers = new Map({ 
'connect': serverDidConnect, 
'disconnect': serverDidDisconnect, 
'connect_failed': serverFailedToConnect 
}); 
const ACTION = input$.map(event_handlers.get.bind(event_handlers)); 

주의 할 : 당신이 이벤트 유형을 통해 일반적으로이 모델을 계속 할 수있는 경우에, 당신은 심지어 표현력 혜택을 일반 데이터 구조에 매핑을 인코딩 할 수 있습니다 당신이 딸 스트림을 통해 감소 된 경우 (또는 다른 이전 값 (예 : debounceTime)을 고려하면 리팩터링은 그리 간단하지 않으며 "보존 순서"라는 새로운 정의에 의존합니다. 대개의 경우, reduce + 더 복잡한 누적기로 재생산하는 것은 여전히 ​​가능할 것입니다.

0

당신에게 욕망 결과를 줄 수있을 수도 있지만 이럴 스트림이 분할되면

Rx.Observable.combineLatest(
    Rx.Observable.from([0,0,0]), 
    Rx.Observable.from([1,1,1]) 
).flatMap(value=>Rx.Observable.from(value)) 
.subscribe(console.log) 
+0

미안하지만 나는 내가 원했던 것을 명확히 밝히지 않았다. 내가 분명히 원래 게시물에 편집을 만들었습니다 – user1898027

+0

.beheaviour의 기본 beheaviour는 내가 병합과 함께했으나 zip으로 작동하는 테스트에서 비동기를 지원하지 않습니다. a = Rx.Observable.from ([1,1, 1]) letb = Rx.Observable.from ([2,2,2]) Rx.Observable.zip (a, b) .subscribe (console.log) –