2017-10-11 4 views
2

내가 수신 버퍼 기능을 사용할 방출하기 : 버퍼가 마지막 발광 이후 갈 5 또는 30 초 크기에 도달하면 발광 (가입자에 게시)을 의미수신 .NET 힘 버퍼는

var source = new Subject<Price>(); 
var buffer = source 
    .Buffer(TimeSpan.FromSeconds(30), 5) 
    .Where(p => p.Any()); 

발생 .

하지만 우선 순위가 높은 순서 항목을 수신하는 경우와 같이 필요할 때마다 내 보내야합니다. 그런 다음 그것을 관측 가능 (source.OnNext())에 추가하고 어떻게 든 그것을 내보내도록합니다 (즉, 버퍼의 모든 요소를 ​​반환하고 지우는 것입니다).

var flusher = new Subject<Price>(); 
var closing = flusher.Select(x => new List<Price> {x}); 
var query = buffer.Merge(closing).Subscribe(something); 

및 flusher.OnNext (highPriorityItem)를 호출하고 나는 그것이 방출해야합니다 :

은 내가 다음 코드를 추가 할 수 있다는 것을 알고.

하지만이 경우 두 개의 서로 다른 방출로 두 개의 독립적 인 시퀀스가 ​​있습니다. 버퍼가 꽉 찼거나 특정 항목이 순서대로 나타나면 하나를 내 보내야합니다.

Force flush count-type Observable.Buffer c#Force flush to Observable.Buffer c#

+0

'[...] me' 어떤 특별한 이유에 적합하지 않는 것 : 여기

는 방법은? 후자는 당신이 요구하는 것을 정확하게 수행하는 것 같습니다 ... – decPL

+0

버퍼 대신 단위 관련 정의와 타이머가 있습니다. 나는 그들이/내 문제를 해결하는 방법을 모르거나 이해하지 못한다. – brolly87

+0

그것들을 구현하려 했습니까? 그냥'Unit'을'Price' 클래스로 대체하십시오. 문제가 있다면 문제를 기술하고 누군가가 도움을 줄 수 있기를 바랍니다. – decPL

답변

2

편집에 적합하지 않는 것 : @Enigmativity 절대적으로 정확, 그의 대답을 참조하십시오. 잘하면이 사고 과정을 여기에서 결정하는 것이 더 쉽기 때문에이 하나를 그대로 남겨 둡니다.

시도 뭔가를 다음과 같이

var input = new Subject<Price>(); //your input observable 

var flush = new Subject<long>(); //used to manually flush the 'buffer' for important prices 
var timeBuffer 
    = Observable.Timer(TimeSpan.FromSeconds(10)); //controls the time-based part of 'buffer' 
var sizeBuffer = input.Buffer(5).Select(l => 0L); //controls the size-based part of 'buffer' 

var bufferedInput = input.Window(()=>Observable.Merge(timeBuffer, sizeBuffer, flush)) 
         .SelectMany(w => w.ToList()) 
         .Subscribe(w => DO_SOMETHING_WITH_PRICES(w)); 

//Flush on important price (NOTE - order of the two subscriptions matter) 
input.Where(p => p.IS_IMPORTANT).Subscribe(p => flush.OnNext(0L)); 
+0

이것은 거의 괜찮습니다. 그러나 OnNext를 통해 중요한 가격을 밀어 넣으면 버퍼의 모든 요소가 이 중요한 가격은 그렇지 않습니다. (버퍼가 5 또는 30 초 후에) 다음 라운드에서 방출 될 것입니다. 첫 번째 방출의 일부가되고 싶습니다 – brolly87

+0

그래서 구독 순서가 중요하다고 썼습니다. 구독하기 전에 중요한 가격을 '플러시'하기 전에 '버퍼'에 가입해야합니다. – decPL

+1

감사합니다. 처음에는 무시했습니다. 이제 작동! – brolly87

4

나는 decPL가 ​​바로 여기에 기본적인 아이디어를 가지고 있지만 자신의 해결책이 안정되지라고 생각합니다. 관찰 할 수있는 input의 스케줄러에 따라 올바른 순서로 구독하더라도 예측할 수없는 결과가 발생할 수 있습니다. input에 대한 여러 독립적 인 구독이 있기 때문입니다. 구독을 한 번만 받으려면 .Publish(...) 전화를 통해이 모든 것을 푸시해야합니다.

구독이 처리 될 때 정리 방법이 필요합니다. 따라서 .Create(...) 호출을 통해 실행해야합니다.

var input = new Subject<Price>(); 

IObservable<IList<Price>> query = 
    input 
     .Publish(i => 
      Observable 
       .Create<IList<Price>>(o => 
       { 
        var timeBuffer = 
         Observable 
          .Timer(TimeSpan.FromSeconds(10.0)) 
          .Select(n => Unit.Default); 
        var flush = 
         i 
          .Where(p => p.IS_IMPORTANT) 
          .Select(n => Unit.Default); 
        var sizeBuffer = 
         i 
          .Buffer(5) 
          .Select(l => Unit.Default); 
        return 
         i 
          .Window(() => Observable.Merge(timeBuffer, sizeBuffer, flush)) 
          .SelectMany(w => w.ToList()) 
          .Subscribe(o); 
       })); 

query.Subscribe(w => DO_SOMETHING_WITH_PRICES(w)); 
+0

IS_IMPORTANT가 특별한 "flushing"을위한 유일한 경우라면, 당신은 또한 관찰 가능한 것으로서'flush' Subject를 대체 할 수 있습니까?'i.Where (p => p.IS_IMPORTANT) .Select (_ => Unit.Default) ')? – supertopi

+0

@supertopi - 잘 부탁드립니다. – Enigmativity