2014-07-11 6 views
3

buffer(closingSelector)과 같은 Bacon.js의 EventStream 값을 RxJava에서 버퍼링하려고합니다. "컨트롤러 스트림"(RxJava 메서드의 closingSelector)이 새 값을 방출하면 이벤트 버퍼가 플러시됩니다.Bacon.js가 다른 스트림으로 스트림 버퍼링을 제어합니다.

그래서 스트림 출력이 stream.bufferWithTimeOrCount과 비슷하지만 시간 간격이나 이벤트 카운트로 버퍼링을 제어하는 ​​대신 다른 스트림으로 버퍼링을 제어하려고합니다.

Bacon.js에서 이것을 쉽게 구현할 수 있습니까? 사용할 수

답변

1

Bacon.js에는 필요한 기능이 없으므로 bacon.js source을보고 holdWhen의 수정 된 버전을 작성했습니다.

Bacon.EventStream.prototype.bufferUntilValue = function(valve) { 
var valve_ = valve.startWith(false); 

    return this.filter(false).merge(valve_.flatMapConcat((function(_this) { 
    return function() { 
     return _this.scan([], (function(xs, x) { 
      return xs.concat(x); 
     }), { 
      eager: true 
     }).sampledBy(valve).take(1); 
    }; 
    })(this))); 
}; 

는 동작에서 볼이 jsFiddle을 확인하십시오.

+0

정말 좋은 접근! 내 솔루션 ('holdWhen'과'bufferWithTime'을 함께 사용)보다 깨끗합니다. 다음 밸브 이벤트가 도착할 때 각 밸브 이벤트가 'sampledBy (valve) .take (1)'로 버퍼 된 값을 반환하는 새로운 "스캐너"를 생성하는 방법을 이해하는 데 시간이 걸렸습니다. – attekei

+0

나는 해결책을 단순화했다. 여기를 보아라. [jsFiddle] (http://jsfiddle.net/7DeQy/) (나중에 나는 holdWhen 만 조금 수정했다. .filter (false) .merge (..)'등) – attekei

1

Bacon.holdWhen 이후에 대한 0.7.14 버퍼링 이벤트가 하나 하나 방출하지만 당신이 원하는 거의 무엇을 수행합니다

stream.holdWhen (밸브) 일시 정지 및 밸브의 마지막 이벤트 인 경우는 이벤트 스트림을 버퍼링 진실. 버퍼링 된 모든 이벤트는 밸브가 허위로 변하면 해제됩니다.

단일 이벤트로 버퍼 이벤트를 방출해야하는 경우

, 당신은 다음과 같이 뭔가를 시도 할 수 있습니다 :

// source streams 
var sourceObservable = Bacon.interval(1000); 
var closingSelector = new Bacon.Bus(); 

// Constructing a new Observable where we're going to keep our state. 
// 
// We need to keep track of two things: 
// - the buffer that is currently being filled, and 
// - a previous buffer that is being flushed. 
// The state will then look like this: 
// [ buffer, flushed] 
// where both buffer and flushed is an array of events from the source observable. 

// empty initial state 
var initialState = {buffer: [], flushed: []} 

// There are two operations on the state: appending a new element to the buffer 
// and flushing the current buffer: 

// append each event from the source observable to the buffer, 
// keeping flushed unchanged 
var appends = sourceObservable.map(function(e) { 
    return function(state) { 
     state.buffer.push(e); return state; 
    } 
}); 

// each event from the closingSelector replaces the `flushed` with 
// the `buffer`'s contents, inserting an empty buffer. 
var flushes = closingSelector.map(function(_) { 
    return function(state) { return {buffer: [], flushed: state.buffer} } 
}) 

// merge appends and flushes into a single stream and apply them to the initial state 
var ops = appends.merge(flushes) 
var state = ops.scan(initialState, function(acc, f) { return f(acc) }); 

// resulting stream of flushed events 
var flushed = state.sampledBy(closingSelector).map(function(state) { return state.flushed }) 

// triggered with `closingSelector.push({})` 
flushed.onValue(function(x) { console.log("flushed", x) }) 
0

stream.holdWhen(valve) 거의 정확하게 당신이 원하는 것 같습니다. 그것은 buffer(closingSelector)과 약간 다른 방식으로 작동합니다 : 항상 버퍼링하는 대신에 closingSelector에서 버퍼를 플러시하고, value 스트림의 마지막 값에 따라 버퍼링을 토글합니다. 그대로

어쩌면 당신은 holdWhen을 사용할 수 있지만 당신이 buffer(closingSelector)처럼 행동을하려는 경우, 당신은 다음과 같이 할 수있다 : 우리는 값 value 스트림이 이벤트를 생성 closingSelector에서 각 이벤트에

var result = sourceStream.holdWhen(closingSelector.flatMap(function(){ 
    return Bacon.fromArray([false, true]); 
}).toProperty(true)); 

truefalse, 즉 버퍼링을 해제 (플러시 트리거) 한 다음 즉시 다시 켭니다.

+0

사실 내 초기 솔루션은 매우 유사했습니다. [jsFiddle] (http://jsfiddle.net/RRWJf/)를 참조하십시오. 유일한 차이점은 출력을 값 배열로 원했기 때문에'bufferWithTime (1)'을 사용하여 결과를 수집했습니다. 그것은 직업을 수행하지만 더 나은 해결책이 있어야한다고 생각했습니다 :) – attekei